]> arthur.barton.de Git - netatalk.git/blob - libatalk/dsi/dsi_stream.c
Converting writing on socket to nonblocking io instead of SO_SNDTIMEO
[netatalk.git] / libatalk / dsi / dsi_stream.c
1 /*
2  * $Id: dsi_stream.c,v 1.20 2009-10-26 12:35:56 franklahm Exp $
3  *
4  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
5  * All rights reserved. See COPYRIGHT.
6  *
7  * this file provides the following functions:
8  * dsi_stream_write:    just write a bunch of bytes.
9  * dsi_stream_read:     just read a bunch of bytes.
10  * dsi_stream_send:     send a DSI header + data.
11  * dsi_stream_receive:  read a DSI header + data.
12  */
13
14 #ifdef HAVE_CONFIG_H
15 #include "config.h"
16 #endif /* HAVE_CONFIG_H */
17
18 #define USE_WRITEV
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include <string.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31
32 #ifdef USE_WRITEV
33 #include <sys/uio.h>
34 #endif
35
36 #include <atalk/logger.h>
37 #include <atalk/dsi.h>
38 #include <netatalk/endian.h>
39 #include <atalk/util.h>
40
41 #define min(a,b)  ((a) < (b) ? (a) : (b))
42
43 #ifndef MSG_MORE
44 #define MSG_MORE 0x8000
45 #endif
46
47 #ifndef MSG_DONTWAIT
48 #define MSG_DONTWAIT 0x40
49 #endif
50
51 /* ------------------------- 
52  * we don't use a circular buffer.
53 */
54 static void dsi_init_buffer(DSI *dsi)
55 {
56     if (!dsi->buffer) {
57         /* XXX config options */
58         dsi->maxsize = 6 * dsi->server_quantum;
59         if (!dsi->maxsize)
60             dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
61         dsi->buffer = malloc(dsi->maxsize);
62         if (!dsi->buffer) {
63             return;
64         }
65         dsi->start = dsi->buffer;
66         dsi->eof = dsi->buffer;
67         dsi->end = dsi->buffer + dsi->maxsize;
68     }
69 }
70
71 /* ---------------------- 
72    afpd is sleeping too much while trying to send something.
73    May be there's no reader or the reader is also sleeping in write,
74    look if there's some data for us to read, hopefully it will wake up
75    the reader so we can write again.
76 */
77 static int dsi_peek(DSI *dsi)
78 {
79     fd_set readfds, writefds;
80     int    len;
81     int    maxfd;
82     int    ret;
83
84     FD_ZERO(&readfds);
85     FD_ZERO(&writefds);
86     FD_SET( dsi->socket, &readfds);
87     FD_SET( dsi->socket, &writefds);
88     maxfd = dsi->socket +1;
89
90     while (1) {
91         FD_SET( dsi->socket, &readfds);
92         FD_SET( dsi->socket, &writefds);
93
94         /* No timeout: if there's nothing to read nor nothing to write,
95          * we've got nothing to do at all */
96         if ((ret = select( maxfd, &readfds, &writefds, NULL, NULL)) <= 0) {
97             if (ret == -1 && errno == EINTR)
98                 /* we might have been interrupted by out timer, so restart select */
99                 continue;
100             /* give up */
101             break;
102         }
103
104         /* Check if there's sth to read, hopefully reading that will unblock the client */
105         if (FD_ISSET(dsi->socket, &readfds)) {
106             dsi_init_buffer(dsi);
107             len = dsi->end - dsi->eof;
108
109             if (len <= 0) {
110                 /* ouch, our buffer is full ! fall back to blocking IO 
111                  * could block and disconnect but it's better than a cpu hog */
112                 break;
113             }
114
115             len = read(dsi->socket, dsi->eof, len);
116             if (len <= 0)
117                 break;
118             dsi->eof += len;
119         }
120
121         if (FD_ISSET(dsi->socket, &writefds))
122             /* we can write again at last */
123             break;
124     }
125
126     return 0;
127 }
128
129 /* ------------------------------
130  * write raw data. return actual bytes read. checks against EINTR
131  * aren't necessary if all of the signals have SA_RESTART
132  * specified. */
133 ssize_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode)
134 {
135   size_t written;
136   ssize_t len;
137   unsigned int flags = 0;
138
139   dsi->in_write++;
140   written = 0;
141
142   LOG(log_maxdebug, logtype_dsi, "dsi_stream_write: sending %u bytes", length);
143
144   /* non blocking mode */
145   if (setnonblock(dsi->socket, 1) < 0) {
146       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
147       return -1;
148   }
149
150   while (written < length) {
151       len = send(dsi->socket, (u_int8_t *) data + written, length - written, flags);
152       if (len >= 0) {
153           written += len;
154           continue;
155       }
156
157       if (errno == EINTR)
158           continue;
159
160       if (errno == EAGAIN || errno == EWOULDBLOCK) {
161           if (mode == DSI_NOWAIT && written == 0) {
162               /* DSI_NOWAIT is used by attention give up in this case. */
163               written = -1;
164               goto exit;
165           }
166
167           /* Try to read sth. in order to break up possible deadlock */
168           if (dsi_peek(dsi) != 0) {
169               written = -1;
170               goto exit;
171           }
172           /* Now try writing again */
173           continue;
174       }
175
176       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
177       written = -1;
178       goto exit;
179   }
180
181   if (setnonblock(dsi->socket, 0) < 0) {
182       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
183       written = -1;
184       goto exit;
185   }
186
187   dsi->write_count += written;
188
189 exit:
190   dsi->in_write--;
191   return written;
192 }
193
194
195 /* ---------------------------------
196 */
197 #ifdef WITH_SENDFILE
198 ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t length)
199 {
200   size_t written;
201   ssize_t len;
202
203   dsi->in_write++;
204   written = 0;
205
206   /* non blocking mode */
207   if (setnonblock(dsi->socket, 1) < 0) {
208       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
209       return -1;
210   }
211
212   while (written < length) {
213     len = sys_sendfile(dsi->socket, fromfd, &offset, length - written);
214         
215     if (len < 0) {
216       if (errno == EINTR)
217           continue;
218       if (errno == EINVAL || errno == ENOSYS)
219           return -1;
220           
221       if (errno == EAGAIN || errno == EWOULDBLOCK) {
222           if (dsi_peek(dsi)) {
223               /* can't go back to blocking mode, exit, the next read
224                  will return with an error and afpd will die.
225               */
226               break;
227           }
228           continue;
229       }
230       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
231       break;
232     }
233     else if (!len) {
234         /* afpd is going to exit */
235         errno = EIO;
236         return -1; /* I think we're at EOF here... */
237     }
238     else 
239         written += len;
240   }
241
242   if (setnonblock(dsi->socket, 0) < 0) {
243       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
244       return -1;
245   }
246
247   dsi->write_count += written;
248   dsi->in_write--;
249   return written;
250 }
251 #endif
252
253 /* 
254  * Return all bytes up to count from dsi->buffer if there are any buffered there
255  */
256 static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
257 {
258     size_t nbe = 0;
259     
260     if (dsi->start) {        
261         nbe = dsi->eof - dsi->start;
262
263         if (nbe > 0) {
264            nbe = min((size_t)nbe, count);
265            memcpy(buf, dsi->start, nbe);
266            dsi->start += nbe;
267
268            if (dsi->eof == dsi->start) 
269                dsi->start = dsi->eof = dsi->buffer;
270
271         }
272     }
273     return nbe;
274 }
275
276 /*
277  * Get bytes from buffer dsi->buffer or read from socket
278  *
279  * 1. Check if there are bytes in the the dsi->buffer buffer.
280  * 2. Return bytes from (1) if yes.
281  *    Note: this may return fewer bytes then requested in count !!
282  * 3. If the buffer was empty, read from the socket.
283  */
284 static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
285 {
286     ssize_t nbe;
287     
288     if (!count)
289         return 0;
290
291     nbe = from_buf(dsi, buf, count); /* 1. */
292     if (nbe)
293         return nbe;             /* 2. */
294   
295     return read(dsi->socket, buf, count); /* 3. */
296 }
297
298 /*
299  * Essentially a loop around buf_read() to ensure "length" bytes are read
300  * from dsi->buffer and/or the socket.
301  */
302 size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
303 {
304   size_t stored;
305   ssize_t len;
306   
307   stored = 0;
308   while (stored < length) {
309     len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
310     if (len == -1 && errno == EINTR)
311       continue;
312     else if (len > 0)
313       stored += len;
314     else { /* eof or error */
315       /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
316       if (len || stored || dsi->read_count) {
317           LOG(log_error, logtype_dsi, "dsi_stream_read(%d): %s", len, (len < 0)?strerror(errno):"unexpected EOF");
318       }
319       break;
320     }
321   }
322
323   dsi->read_count += stored;
324   return stored;
325 }
326
327 /*
328  * Get "length" bytes from buffer and/or socket. In order to avoid frequent small reads
329  * this tries to read larger chunks (8192 bytes) into a buffer.
330  */
331 static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
332 {
333   size_t len;
334   size_t buflen;
335   
336   dsi_init_buffer(dsi);
337   len = from_buf(dsi, data, length); /* read from buffer dsi->buffer */
338   dsi->read_count += len;
339   if (len == length) {          /* got enough bytes from there ? */
340       return len;               /* yes */
341   }
342
343   /* fill the buffer with 8192 bytes or until buffer is full */
344   buflen = min(8192, dsi->end - dsi->eof);
345   if (buflen > 0) {
346       ssize_t ret;
347       ret = read(dsi->socket, dsi->eof, buflen);
348       if (ret > 0)
349           dsi->eof += ret;
350   }
351
352   /* now get the remaining data */
353   len += dsi_stream_read(dsi, data + len, length - len);
354   return len;
355 }
356
357 /* ---------------------------------------
358 */
359 void dsi_sleep(DSI *dsi, const int state)
360 {
361     dsi->asleep = state;
362 }
363
364 /* ---------------------------------------
365 */
366 static void block_sig(DSI *dsi)
367 {
368   dsi->in_write++;
369 }
370
371 /* ---------------------------------------
372 */
373 static void unblock_sig(DSI *dsi)
374 {
375   dsi->in_write--;
376 }
377
378 /* ---------------------------------------
379  * write data. 0 on failure. this assumes that dsi_len will never
380  * cause an overflow in the data buffer. 
381  */
382 int dsi_stream_send(DSI *dsi, void *buf, size_t length)
383 {
384   char block[DSI_BLOCKSIZ];
385 #ifdef USE_WRITEV
386   struct iovec iov[2];
387   size_t towrite;
388   ssize_t len;
389 #endif /* USE_WRITEV */
390
391   block[0] = dsi->header.dsi_flags;
392   block[1] = dsi->header.dsi_command;
393   memcpy(block + 2, &dsi->header.dsi_requestID, 
394          sizeof(dsi->header.dsi_requestID));
395   memcpy(block + 4, &dsi->header.dsi_code, sizeof(dsi->header.dsi_code));
396   memcpy(block + 8, &dsi->header.dsi_len, sizeof(dsi->header.dsi_len));
397   memcpy(block + 12, &dsi->header.dsi_reserved,
398          sizeof(dsi->header.dsi_reserved));
399
400   if (!length) { /* just write the header */
401     length = (dsi_stream_write(dsi, block, sizeof(block), 0) == sizeof(block));
402     return length; /* really 0 on failure, 1 on success */
403   }
404   
405   /* block signals */
406   block_sig(dsi);
407 #ifdef USE_WRITEV
408   iov[0].iov_base = block;
409   iov[0].iov_len = sizeof(block);
410   iov[1].iov_base = buf;
411   iov[1].iov_len = length;
412   
413   towrite = sizeof(block) + length;
414   dsi->write_count += towrite;
415   while (towrite > 0) {
416     if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || 
417         !len)
418       continue;
419     
420     if ((size_t)len == towrite) /* wrote everything out */
421       break;
422     else if (len < 0) { /* error */
423       if (errno == EAGAIN || errno == EWOULDBLOCK) {
424           if (!dsi_peek(dsi)) {
425               continue;
426           }
427       }
428       LOG(log_error, logtype_dsi, "dsi_stream_send: %s", strerror(errno));
429       unblock_sig(dsi);
430       return 0;
431     }
432     
433     towrite -= len;
434     if (towrite > length) { /* skip part of header */
435       iov[0].iov_base = (char *) iov[0].iov_base + len;
436       iov[0].iov_len -= len;
437     } else { /* skip to data */
438       if (iov[0].iov_len) {
439         len -= iov[0].iov_len;
440         iov[0].iov_len = 0;
441       }
442       iov[1].iov_base = (char *) iov[1].iov_base + len;
443       iov[1].iov_len -= len;
444     }
445   }
446   
447 #else /* USE_WRITEV */
448   /* write the header then data */
449   if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
450             (dsi_stream_write(dsi, buf, length, 0) != length)) {
451       unblock_sig(dsi);
452       return 0;
453   }
454 #endif /* USE_WRITEV */
455
456   unblock_sig(dsi);
457   return 1;
458 }
459
460
461 /* ---------------------------------------
462  * read data. function on success. 0 on failure. data length gets
463  * stored in length variable. this should really use size_t's, but
464  * that would require changes elsewhere. */
465 int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
466                        size_t *rlength)
467 {
468   char block[DSI_BLOCKSIZ];
469
470   /* read in the header */
471   if (dsi_buffered_stream_read(dsi, (u_int8_t *)block, sizeof(block)) != sizeof(block)) 
472     return 0;
473
474   dsi->header.dsi_flags = block[0];
475   dsi->header.dsi_command = block[1];
476   /* FIXME, not the right place, 
477      but we get a server disconnect without reason in the log
478   */
479   if (!block[1]) {
480       LOG(log_error, logtype_dsi, "dsi_stream_receive: invalid packet, fatal");
481       return 0;
482   }
483
484   memcpy(&dsi->header.dsi_requestID, block + 2, 
485          sizeof(dsi->header.dsi_requestID));
486   memcpy(&dsi->header.dsi_code, block + 4, sizeof(dsi->header.dsi_code));
487   memcpy(&dsi->header.dsi_len, block + 8, sizeof(dsi->header.dsi_len));
488   memcpy(&dsi->header.dsi_reserved, block + 12,
489          sizeof(dsi->header.dsi_reserved));
490   dsi->clientID = ntohs(dsi->header.dsi_requestID);
491   
492   /* make sure we don't over-write our buffers. */
493   *rlength = min(ntohl(dsi->header.dsi_len), ilength);
494   if (dsi_stream_read(dsi, buf, *rlength) != *rlength) 
495     return 0;
496
497   return block[1];
498 }