]> arthur.barton.de Git - netatalk.git/blob - libatalk/dsi/dsi_stream.c
sendfile commit didn't compile if sendfile disable
[netatalk.git] / libatalk / dsi / dsi_stream.c
1 /*
2  * $Id: dsi_stream.c,v 1.19 2009-10-25 12:09:00 didg Exp $
3  *
4  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
5  * All rights reserved. See COPYRIGHT.
6  *
7  * this file provides the following functions:
8  * dsi_stream_write:    just write a bunch of bytes.
9  * dsi_stream_read:     just read a bunch of bytes.
10  * dsi_stream_send:     send a DSI header + data.
11  * dsi_stream_receive:  read a DSI header + data.
12  */
13
14 #ifdef HAVE_CONFIG_H
15 #include "config.h"
16 #endif /* HAVE_CONFIG_H */
17
18 #define USE_WRITEV
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include <string.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31
32 #ifdef USE_WRITEV
33 #include <sys/uio.h>
34 #endif
35
36 #include <atalk/logger.h>
37
38 #include <atalk/dsi.h>
39 #include <netatalk/endian.h>
40 #include <atalk/util.h>
41 #include <sys/ioctl.h> 
42
43 #define min(a,b)  ((a) < (b) ? (a) : (b))
44
45 #ifndef MSG_MORE
46 #define MSG_MORE 0x8000
47 #endif
48
49 #ifndef MSG_DONTWAIT
50 #define MSG_DONTWAIT 0x40
51 #endif
52
53 /* ------------------------- 
54  * we don't use a circular buffer.
55 */
56 static void dsi_init_buffer(DSI *dsi)
57 {
58     if (!dsi->buffer) {
59         /* XXX config options */
60         dsi->maxsize = 6 * dsi->server_quantum;
61         if (!dsi->maxsize)
62             dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
63         dsi->buffer = malloc(dsi->maxsize);
64         if (!dsi->buffer) {
65             return;
66         }
67         dsi->start = dsi->buffer;
68         dsi->eof = dsi->buffer;
69         dsi->end = dsi->buffer + dsi->maxsize;
70     }
71 }
72
73 /* ---------------------- 
74    afpd is sleeping too much while trying to send something.
75    May be there's no reader or the reader is also sleeping in write,
76    look if there's some data for us to read, hopefully it will wake up
77    the reader
78 */
79 static int dsi_buffer(DSI *dsi)
80 {
81     fd_set readfds, writefds;
82     int    len;
83     int    maxfd;
84     int adr;
85
86     /* non blocking mode */
87     adr = 1;
88     if (ioctl(dsi->socket, FIONBIO, &adr) < 0) {
89         /* can't do it! exit without error it will sleep to death below */
90         LOG(log_error, logtype_default, "dsi_buffer: ioctl non blocking mode %s", strerror(errno));
91         return 0;
92     }
93     
94     FD_ZERO(&readfds);
95     FD_ZERO(&writefds);
96     FD_SET( dsi->socket, &readfds);
97     FD_SET( dsi->socket, &writefds);
98     maxfd = dsi->socket +1;
99     while (1) {
100         FD_SET( dsi->socket, &readfds);
101         FD_SET( dsi->socket, &writefds);
102         if (select( maxfd, &readfds, &writefds, NULL, NULL) <= 0)
103             break;
104
105         if ( !FD_ISSET(dsi->socket, &readfds)) {
106             /* nothing waiting in the read queue */
107             break;
108         }
109         dsi_init_buffer(dsi);
110         len = dsi->end - dsi->eof;
111
112         if (len <= 0) {
113             /* ouch, our buffer is full ! 
114              * fall back to blocking IO 
115              * could block and disconnect but it's better than a cpu hog
116              */
117             break;
118         }
119
120         len = read(dsi->socket, dsi->eof, len);
121         if (len <= 0)
122             break;
123         dsi->eof += len;
124         if ( FD_ISSET(dsi->socket, &writefds)) {
125             /* we can write again at last */
126             break;
127         }
128     }
129     adr = 0;
130     if (ioctl(dsi->socket, FIONBIO, &adr) < 0) {
131         /* can't do it! afpd will fail very quickly */
132         LOG(log_error, logtype_default, "dsi_buffer: ioctl blocking mode %s", strerror(errno));
133         return -1;
134     }
135     return 0;
136 }
137
138 /* ------------------------------
139  * write raw data. return actual bytes read. checks against EINTR
140  * aren't necessary if all of the signals have SA_RESTART
141  * specified. */
142 ssize_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode)
143 {
144   size_t written;
145   ssize_t len;
146 #if 0
147   /* FIXME sometime it's slower */
148   unsigned int flags = (mode)?MSG_MORE:0;
149 #endif
150   unsigned int flags = 0;
151
152 #if 0
153   /* XXX there's no MSG_DONTWAIT in recv ?? so we have to play with ioctl
154   */ 
155   flags |= MSG_DONTWAIT;
156 #endif
157   
158   dsi->in_write++;
159   written = 0;
160   while (written < length) {
161     if ((-1 == (len = send(dsi->socket, (u_int8_t *) data + written,
162                       length - written, flags)) && errno == EINTR) ||
163         !len)
164       continue;
165
166     if (len < 0) {
167       if (errno == EAGAIN || errno == EWOULDBLOCK) {
168           if (mode == DSI_NOWAIT && written == 0) {
169               /* DSI_NOWAIT is used by attention
170                  give up in this case.
171               */
172               return -1;
173           }
174           if (dsi_buffer(dsi)) {
175               /* can't go back to blocking mode, exit, the next read
176                  will return with an error and afpd will die.
177               */
178               break;
179           }
180           continue;
181       }
182       LOG(log_error, logtype_default, "dsi_stream_write: %s", strerror(errno));
183       break;
184     }
185     else {
186         written += len;
187     }
188   }
189
190   dsi->write_count += written;
191   dsi->in_write--;
192   return written;
193 }
194
195
196 /* ---------------------------------
197 */
198 #ifdef WITH_SENDFILE
199 ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t length)
200 {
201   size_t written;
202   ssize_t len;
203
204   dsi->in_write++;
205   written = 0;
206
207   while (written < length) {
208     len = sys_sendfile(dsi->socket, fromfd, &offset, length - written);
209         
210     if (len < 0) {
211       if (errno == EINTR)
212           continue;
213       if (errno == EINVAL || errno == ENOSYS)
214           return -1;
215           
216       if (errno == EAGAIN || errno == EWOULDBLOCK) {
217           if (dsi_buffer(dsi)) {
218               /* can't go back to blocking mode, exit, the next read
219                  will return with an error and afpd will die.
220               */
221               break;
222           }
223           continue;
224       }
225       LOG(log_error, logtype_default, "dsi_stream_write: %s", strerror(errno));
226       break;
227     }
228     else if (!len) {
229         /* afpd is going to exit */
230         errno = EIO;
231         return -1; /* I think we're at EOF here... */
232     }
233     else 
234         written += len;
235   }
236
237   dsi->write_count += written;
238   dsi->in_write--;
239   return written;
240 }
241 #endif
242
243 /* ---------------------------------
244 */
245 static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
246 {
247     size_t nbe = 0;
248     
249     if (dsi->start) {        
250         nbe = dsi->eof - dsi->start;
251
252         if (nbe > 0) {
253            nbe = min((size_t)nbe, count);
254            memcpy(buf, dsi->start, nbe);
255            dsi->start += nbe;
256
257            if (dsi->eof == dsi->start) 
258                dsi->start = dsi->eof = dsi->buffer;
259
260         }
261     }
262     return nbe;
263 }
264
265 static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
266 {
267     ssize_t nbe;
268     
269     if (!count)
270         return 0;
271
272     nbe = from_buf(dsi, buf, count);
273     if (nbe)
274         return nbe;
275   
276     return read(dsi->socket, buf, count);
277
278 }
279
280 /* ---------------------------------------
281  * read raw data. return actual bytes read. this will wait until 
282  * it gets length bytes 
283  */
284 size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
285 {
286   size_t stored;
287   ssize_t len;
288   
289   stored = 0;
290   while (stored < length) {
291     len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
292     if (len == -1 && errno == EINTR)
293       continue;
294     else if (len > 0)
295       stored += len;
296     else { /* eof or error */
297       /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
298       if (len || stored || dsi->read_count) {
299           LOG(log_error, logtype_default, "dsi_stream_read(%d): %s", len, (len < 0)?strerror(errno):"unexpected EOF");
300       }
301       break;
302     }
303   }
304
305   dsi->read_count += stored;
306   return stored;
307 }
308
309 /* ---------------------------------------
310  * read raw data. return actual bytes read. this will wait until 
311  * it gets length bytes 
312  */
313 static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
314 {
315   size_t len;
316   size_t buflen;
317   
318   dsi_init_buffer(dsi);
319   len = from_buf(dsi, data, length);
320   dsi->read_count += len;
321   if (len == length) {
322       return len;
323   }
324   
325   buflen = min(8192, dsi->end - dsi->eof);
326   if (buflen > 0) {
327       ssize_t ret;
328       ret = read(dsi->socket, dsi->eof, buflen);
329       if (ret > 0)
330           dsi->eof += ret;
331   }
332   return dsi_stream_read(dsi, data, length -len);
333 }
334
335 /* ---------------------------------------
336 */
337 void dsi_sleep(DSI *dsi, const int state)
338 {
339     dsi->asleep = state;
340 }
341
342 /* ---------------------------------------
343 */
344 static void block_sig(DSI *dsi)
345 {
346   dsi->in_write++;
347 }
348
349 /* ---------------------------------------
350 */
351 static void unblock_sig(DSI *dsi)
352 {
353   dsi->in_write--;
354 }
355
356 /* ---------------------------------------
357  * write data. 0 on failure. this assumes that dsi_len will never
358  * cause an overflow in the data buffer. 
359  */
360 int dsi_stream_send(DSI *dsi, void *buf, size_t length)
361 {
362   char block[DSI_BLOCKSIZ];
363 #ifdef USE_WRITEV
364   struct iovec iov[2];
365   size_t towrite;
366   ssize_t len;
367 #endif /* USE_WRITEV */
368
369   block[0] = dsi->header.dsi_flags;
370   block[1] = dsi->header.dsi_command;
371   memcpy(block + 2, &dsi->header.dsi_requestID, 
372          sizeof(dsi->header.dsi_requestID));
373   memcpy(block + 4, &dsi->header.dsi_code, sizeof(dsi->header.dsi_code));
374   memcpy(block + 8, &dsi->header.dsi_len, sizeof(dsi->header.dsi_len));
375   memcpy(block + 12, &dsi->header.dsi_reserved,
376          sizeof(dsi->header.dsi_reserved));
377
378   if (!length) { /* just write the header */
379     length = (dsi_stream_write(dsi, block, sizeof(block), 0) == sizeof(block));
380     return length; /* really 0 on failure, 1 on success */
381   }
382   
383   /* block signals */
384   block_sig(dsi);
385 #ifdef USE_WRITEV
386   iov[0].iov_base = block;
387   iov[0].iov_len = sizeof(block);
388   iov[1].iov_base = buf;
389   iov[1].iov_len = length;
390   
391   towrite = sizeof(block) + length;
392   dsi->write_count += towrite;
393   while (towrite > 0) {
394     if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || 
395         !len)
396       continue;
397     
398     if ((size_t)len == towrite) /* wrote everything out */
399       break;
400     else if (len < 0) { /* error */
401       if (errno == EAGAIN || errno == EWOULDBLOCK) {
402           if (!dsi_buffer(dsi)) {
403               continue;
404           }
405       }
406       LOG(log_error, logtype_default, "dsi_stream_send: %s", strerror(errno));
407       unblock_sig(dsi);
408       return 0;
409     }
410     
411     towrite -= len;
412     if (towrite > length) { /* skip part of header */
413       iov[0].iov_base = (char *) iov[0].iov_base + len;
414       iov[0].iov_len -= len;
415     } else { /* skip to data */
416       if (iov[0].iov_len) {
417         len -= iov[0].iov_len;
418         iov[0].iov_len = 0;
419       }
420       iov[1].iov_base = (char *) iov[1].iov_base + len;
421       iov[1].iov_len -= len;
422     }
423   }
424   
425 #else /* USE_WRITEV */
426   /* write the header then data */
427   if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
428             (dsi_stream_write(dsi, buf, length, 0) != length)) {
429       unblock_sig(dsi);
430       return 0;
431   }
432 #endif /* USE_WRITEV */
433
434   unblock_sig(dsi);
435   return 1;
436 }
437
438
439 /* ---------------------------------------
440  * read data. function on success. 0 on failure. data length gets
441  * stored in length variable. this should really use size_t's, but
442  * that would require changes elsewhere. */
443 int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
444                        size_t *rlength)
445 {
446   char block[DSI_BLOCKSIZ];
447
448   /* read in the header */
449   if (dsi_buffered_stream_read(dsi, (u_int8_t *)block, sizeof(block)) != sizeof(block)) 
450     return 0;
451
452   dsi->header.dsi_flags = block[0];
453   dsi->header.dsi_command = block[1];
454   /* FIXME, not the right place, 
455      but we get a server disconnect without reason in the log
456   */
457   if (!block[1]) {
458       LOG(log_error, logtype_default, "dsi_stream_receive: invalid packet, fatal");
459       return 0;
460   }
461
462   memcpy(&dsi->header.dsi_requestID, block + 2, 
463          sizeof(dsi->header.dsi_requestID));
464   memcpy(&dsi->header.dsi_code, block + 4, sizeof(dsi->header.dsi_code));
465   memcpy(&dsi->header.dsi_len, block + 8, sizeof(dsi->header.dsi_len));
466   memcpy(&dsi->header.dsi_reserved, block + 12,
467          sizeof(dsi->header.dsi_reserved));
468   dsi->clientID = ntohs(dsi->header.dsi_requestID);
469   
470   /* make sure we don't over-write our buffers. */
471   *rlength = min(ntohl(dsi->header.dsi_len), ilength);
472   if (dsi_stream_read(dsi, buf, *rlength) != *rlength) 
473     return 0;
474
475   return block[1];
476 }