]> arthur.barton.de Git - netatalk.git/blob - libatalk/dsi/dsi_stream.c
Better handling of connection errors and disconnect state
[netatalk.git] / libatalk / dsi / dsi_stream.c
1 /*
2  * $Id: dsi_stream.c,v 1.20 2009-10-26 12:35:56 franklahm Exp $
3  *
4  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
5  * All rights reserved. See COPYRIGHT.
6  *
7  * this file provides the following functions:
8  * dsi_stream_write:    just write a bunch of bytes.
9  * dsi_stream_read:     just read a bunch of bytes.
10  * dsi_stream_send:     send a DSI header + data.
11  * dsi_stream_receive:  read a DSI header + data.
12  */
13
14 #ifdef HAVE_CONFIG_H
15 #include "config.h"
16 #endif /* HAVE_CONFIG_H */
17
18 #define USE_WRITEV
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include <string.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31
32 #ifdef USE_WRITEV
33 #include <sys/uio.h>
34 #endif
35
36 #include <atalk/logger.h>
37 #include <atalk/dsi.h>
38 #include <netatalk/endian.h>
39 #include <atalk/util.h>
40
41 #define min(a,b)  ((a) < (b) ? (a) : (b))
42
43 #ifndef MSG_MORE
44 #define MSG_MORE 0x8000
45 #endif
46
47 #ifndef MSG_DONTWAIT
48 #define MSG_DONTWAIT 0x40
49 #endif
50
51 /* ------------------------- 
52  * we don't use a circular buffer.
53 */
54 static void dsi_init_buffer(DSI *dsi)
55 {
56     if (!dsi->buffer) {
57         /* XXX config options */
58         dsi->maxsize = 6 * dsi->server_quantum;
59         if (!dsi->maxsize)
60             dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
61         dsi->buffer = malloc(dsi->maxsize);
62         if (!dsi->buffer) {
63             return;
64         }
65         dsi->start = dsi->buffer;
66         dsi->eof = dsi->buffer;
67         dsi->end = dsi->buffer + dsi->maxsize;
68     }
69 }
70
71 /* ---------------------- 
72    afpd is sleeping too much while trying to send something.
73    May be there's no reader or the reader is also sleeping in write,
74    look if there's some data for us to read, hopefully it will wake up
75    the reader so we can write again.
76 */
77 static int dsi_peek(DSI *dsi)
78 {
79     fd_set readfds, writefds;
80     int    len;
81     int    maxfd;
82     int    ret;
83
84     FD_ZERO(&readfds);
85     FD_ZERO(&writefds);
86     FD_SET( dsi->socket, &readfds);
87     FD_SET( dsi->socket, &writefds);
88     maxfd = dsi->socket +1;
89
90     while (1) {
91         FD_SET( dsi->socket, &readfds);
92         FD_SET( dsi->socket, &writefds);
93
94         /* No timeout: if there's nothing to read nor nothing to write,
95          * we've got nothing to do at all */
96         if ((ret = select( maxfd, &readfds, &writefds, NULL, NULL)) <= 0) {
97             if (ret == -1 && errno == EINTR)
98                 /* we might have been interrupted by out timer, so restart select */
99                 continue;
100             /* give up */
101             break;
102         }
103
104         /* Check if there's sth to read, hopefully reading that will unblock the client */
105         if (FD_ISSET(dsi->socket, &readfds)) {
106             dsi_init_buffer(dsi);
107             len = dsi->end - dsi->eof;
108
109             if (len <= 0) {
110                 /* ouch, our buffer is full ! fall back to blocking IO 
111                  * could block and disconnect but it's better than a cpu hog */
112                 break;
113             }
114
115             len = read(dsi->socket, dsi->eof, len);
116             if (len <= 0)
117                 break;
118             dsi->eof += len;
119         }
120
121         if (FD_ISSET(dsi->socket, &writefds))
122             /* we can write again at last */
123             break;
124     }
125
126     return 0;
127 }
128
129 /* ------------------------------
130  * write raw data. return actual bytes read. checks against EINTR
131  * aren't necessary if all of the signals have SA_RESTART
132  * specified. */
133 ssize_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode)
134 {
135   size_t written;
136   ssize_t len;
137   unsigned int flags = 0;
138
139   dsi->in_write++;
140   written = 0;
141
142   LOG(log_maxdebug, logtype_dsi, "dsi_stream_write: sending %u bytes", length);
143
144   /* non blocking mode */
145   if (setnonblock(dsi->socket, 1) < 0) {
146       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
147       return -1;
148   }
149
150   while (written < length) {
151       len = send(dsi->socket, (u_int8_t *) data + written, length - written, flags);
152       if (len >= 0) {
153           written += len;
154           continue;
155       }
156
157       if (errno == EINTR)
158           continue;
159
160       if (errno == EAGAIN || errno == EWOULDBLOCK) {
161           if (mode == DSI_NOWAIT && written == 0) {
162               /* DSI_NOWAIT is used by attention give up in this case. */
163               written = -1;
164               goto exit;
165           }
166
167           /* Try to read sth. in order to break up possible deadlock */
168           if (dsi_peek(dsi) != 0) {
169               written = -1;
170               goto exit;
171           }
172           /* Now try writing again */
173           continue;
174       }
175
176       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
177       written = -1;
178       goto exit;
179   }
180
181   dsi->write_count += written;
182
183 exit:
184   if (setnonblock(dsi->socket, 0) < 0) {
185       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
186       written = -1;
187   }
188
189   dsi->in_write--;
190   return written;
191 }
192
193
194 /* ---------------------------------
195 */
196 #ifdef WITH_SENDFILE
197 ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t length)
198 {
199   size_t written;
200   ssize_t len;
201
202   dsi->in_write++;
203   written = 0;
204
205   /* non blocking mode */
206   if (setnonblock(dsi->socket, 1) < 0) {
207       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
208       return -1;
209   }
210
211   while (written < length) {
212     len = sys_sendfile(dsi->socket, fromfd, &offset, length - written);
213         
214     if (len < 0) {
215       if (errno == EINTR)
216           continue;
217       if (errno == EINVAL || errno == ENOSYS)
218           return -1;
219           
220       if (errno == EAGAIN || errno == EWOULDBLOCK) {
221           if (dsi_peek(dsi)) {
222               /* can't go back to blocking mode, exit, the next read
223                  will return with an error and afpd will die.
224               */
225               break;
226           }
227           continue;
228       }
229       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
230       break;
231     }
232     else if (!len) {
233         /* afpd is going to exit */
234         errno = EIO;
235         return -1; /* I think we're at EOF here... */
236     }
237     else 
238         written += len;
239   }
240
241   if (setnonblock(dsi->socket, 0) < 0) {
242       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
243       return -1;
244   }
245
246   dsi->write_count += written;
247   dsi->in_write--;
248   return written;
249 }
250 #endif
251
252 /* 
253  * Return all bytes up to count from dsi->buffer if there are any buffered there
254  */
255 static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
256 {
257     size_t nbe = 0;
258     
259     if (dsi->start) {        
260         nbe = dsi->eof - dsi->start;
261
262         if (nbe > 0) {
263            nbe = min((size_t)nbe, count);
264            memcpy(buf, dsi->start, nbe);
265            dsi->start += nbe;
266
267            if (dsi->eof == dsi->start) 
268                dsi->start = dsi->eof = dsi->buffer;
269
270         }
271     }
272     return nbe;
273 }
274
275 /*
276  * Get bytes from buffer dsi->buffer or read from socket
277  *
278  * 1. Check if there are bytes in the the dsi->buffer buffer.
279  * 2. Return bytes from (1) if yes.
280  *    Note: this may return fewer bytes then requested in count !!
281  * 3. If the buffer was empty, read from the socket.
282  */
283 static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
284 {
285     ssize_t nbe;
286     
287     if (!count)
288         return 0;
289
290     nbe = from_buf(dsi, buf, count); /* 1. */
291     if (nbe)
292         return nbe;             /* 2. */
293   
294     return read(dsi->socket, buf, count); /* 3. */
295 }
296
297 /*
298  * Essentially a loop around buf_read() to ensure "length" bytes are read
299  * from dsi->buffer and/or the socket.
300  */
301 size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
302 {
303   size_t stored;
304   ssize_t len;
305   
306   stored = 0;
307   while (stored < length) {
308     len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
309     if (len == -1 && errno == EINTR)
310       continue;
311     else if (len > 0)
312       stored += len;
313     else { /* eof or error */
314       /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
315       if (len || stored || dsi->read_count) {
316           if (! (dsi->flags & DSI_DISCONNECTED)
317 )
318               LOG(log_error, logtype_dsi, "dsi_stream_read(%d): %s", len, (len < 0)?strerror(errno):"unexpected EOF");
319       }
320       break;
321     }
322   }
323
324   dsi->read_count += stored;
325   return stored;
326 }
327
328 /*
329  * Get "length" bytes from buffer and/or socket. In order to avoid frequent small reads
330  * this tries to read larger chunks (8192 bytes) into a buffer.
331  */
332 static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
333 {
334   size_t len;
335   size_t buflen;
336   
337   dsi_init_buffer(dsi);
338   len = from_buf(dsi, data, length); /* read from buffer dsi->buffer */
339   dsi->read_count += len;
340   if (len == length) {          /* got enough bytes from there ? */
341       return len;               /* yes */
342   }
343
344   /* fill the buffer with 8192 bytes or until buffer is full */
345   buflen = min(8192, dsi->end - dsi->eof);
346   if (buflen > 0) {
347       ssize_t ret;
348       ret = read(dsi->socket, dsi->eof, buflen);
349       if (ret > 0)
350           dsi->eof += ret;
351   }
352
353   /* now get the remaining data */
354   len += dsi_stream_read(dsi, data + len, length - len);
355   return len;
356 }
357
358 /* ---------------------------------------
359 */
360 void dsi_sleep(DSI *dsi, const int state)
361 {
362     if (state)
363         dsi->flags |= DSI_SLEEPING;
364     else
365         dsi->flags &= ~DSI_SLEEPING;
366 }
367
368 /* ---------------------------------------
369 */
370 static void block_sig(DSI *dsi)
371 {
372   dsi->in_write++;
373 }
374
375 /* ---------------------------------------
376 */
377 static void unblock_sig(DSI *dsi)
378 {
379   dsi->in_write--;
380 }
381
382 /* ---------------------------------------
383  * write data. 0 on failure. this assumes that dsi_len will never
384  * cause an overflow in the data buffer. 
385  */
386 int dsi_stream_send(DSI *dsi, void *buf, size_t length)
387 {
388   char block[DSI_BLOCKSIZ];
389 #ifdef USE_WRITEV
390   struct iovec iov[2];
391   size_t towrite;
392   ssize_t len;
393 #endif /* USE_WRITEV */
394
395   block[0] = dsi->header.dsi_flags;
396   block[1] = dsi->header.dsi_command;
397   memcpy(block + 2, &dsi->header.dsi_requestID, 
398          sizeof(dsi->header.dsi_requestID));
399   memcpy(block + 4, &dsi->header.dsi_code, sizeof(dsi->header.dsi_code));
400   memcpy(block + 8, &dsi->header.dsi_len, sizeof(dsi->header.dsi_len));
401   memcpy(block + 12, &dsi->header.dsi_reserved,
402          sizeof(dsi->header.dsi_reserved));
403
404   if (!length) { /* just write the header */
405     length = (dsi_stream_write(dsi, block, sizeof(block), 0) == sizeof(block));
406     return length; /* really 0 on failure, 1 on success */
407   }
408   
409   /* block signals */
410   block_sig(dsi);
411 #ifdef USE_WRITEV
412   iov[0].iov_base = block;
413   iov[0].iov_len = sizeof(block);
414   iov[1].iov_base = buf;
415   iov[1].iov_len = length;
416   
417   towrite = sizeof(block) + length;
418   dsi->write_count += towrite;
419   while (towrite > 0) {
420     if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || 
421         !len)
422       continue;
423     
424     if ((size_t)len == towrite) /* wrote everything out */
425       break;
426     else if (len < 0) { /* error */
427       if (errno == EAGAIN || errno == EWOULDBLOCK) {
428           if (!dsi_peek(dsi)) {
429               continue;
430           }
431       }
432       LOG(log_error, logtype_dsi, "dsi_stream_send: %s", strerror(errno));
433       unblock_sig(dsi);
434       return 0;
435     }
436     
437     towrite -= len;
438     if (towrite > length) { /* skip part of header */
439       iov[0].iov_base = (char *) iov[0].iov_base + len;
440       iov[0].iov_len -= len;
441     } else { /* skip to data */
442       if (iov[0].iov_len) {
443         len -= iov[0].iov_len;
444         iov[0].iov_len = 0;
445       }
446       iov[1].iov_base = (char *) iov[1].iov_base + len;
447       iov[1].iov_len -= len;
448     }
449   }
450   
451 #else /* USE_WRITEV */
452   /* write the header then data */
453   if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
454             (dsi_stream_write(dsi, buf, length, 0) != length)) {
455       unblock_sig(dsi);
456       return 0;
457   }
458 #endif /* USE_WRITEV */
459
460   unblock_sig(dsi);
461   return 1;
462 }
463
464
465 /* ---------------------------------------
466  * read data. function on success. 0 on failure. data length gets
467  * stored in length variable. this should really use size_t's, but
468  * that would require changes elsewhere. */
469 int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
470                        size_t *rlength)
471 {
472   char block[DSI_BLOCKSIZ];
473
474   /* read in the header */
475   if (dsi_buffered_stream_read(dsi, (u_int8_t *)block, sizeof(block)) != sizeof(block)) 
476     return 0;
477
478   dsi->header.dsi_flags = block[0];
479   dsi->header.dsi_command = block[1];
480   /* FIXME, not the right place, 
481      but we get a server disconnect without reason in the log
482   */
483   if (!block[1]) {
484       LOG(log_error, logtype_dsi, "dsi_stream_receive: invalid packet, fatal");
485       return 0;
486   }
487
488   memcpy(&dsi->header.dsi_requestID, block + 2, 
489          sizeof(dsi->header.dsi_requestID));
490   memcpy(&dsi->header.dsi_code, block + 4, sizeof(dsi->header.dsi_code));
491   memcpy(&dsi->header.dsi_len, block + 8, sizeof(dsi->header.dsi_len));
492   memcpy(&dsi->header.dsi_reserved, block + 12,
493          sizeof(dsi->header.dsi_reserved));
494   dsi->clientID = ntohs(dsi->header.dsi_requestID);
495   
496   /* make sure we don't over-write our buffers. */
497   *rlength = min(ntohl(dsi->header.dsi_len), ilength);
498   if (dsi_stream_read(dsi, buf, *rlength) != *rlength) 
499     return 0;
500
501   return block[1];
502 }