]> arthur.barton.de Git - netatalk.git/blob - libatalk/dsi/dsi_stream.c
afpd.conf.tmpl: Merge remote-tracking branch 'remotes/origin/branch-netatalk-2-1'
[netatalk.git] / libatalk / dsi / dsi_stream.c
1 /*
2  * $Id: dsi_stream.c,v 1.20 2009-10-26 12:35:56 franklahm Exp $
3  *
4  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
5  * All rights reserved. See COPYRIGHT.
6  *
7  * this file provides the following functions:
8  * dsi_stream_write:    just write a bunch of bytes.
9  * dsi_stream_read:     just read a bunch of bytes.
10  * dsi_stream_send:     send a DSI header + data.
11  * dsi_stream_receive:  read a DSI header + data.
12  */
13
14 #ifdef HAVE_CONFIG_H
15 #include "config.h"
16 #endif /* HAVE_CONFIG_H */
17
18 #define USE_WRITEV
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include <string.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31
32 #ifdef USE_WRITEV
33 #include <sys/uio.h>
34 #endif
35
36 #include <atalk/logger.h>
37 #include <atalk/dsi.h>
38 #include <netatalk/endian.h>
39 #include <atalk/util.h>
40
41 #define min(a,b)  ((a) < (b) ? (a) : (b))
42
43 #ifndef MSG_MORE
44 #define MSG_MORE 0x8000
45 #endif
46
47 #ifndef MSG_DONTWAIT
48 #define MSG_DONTWAIT 0x40
49 #endif
50
51 /* ------------------------- 
52  * we don't use a circular buffer.
53 */
54 static void dsi_init_buffer(DSI *dsi)
55 {
56     if (!dsi->buffer) {
57         /* XXX config options */
58         dsi->maxsize = 6 * dsi->server_quantum;
59         if (!dsi->maxsize)
60             dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
61         dsi->buffer = malloc(dsi->maxsize);
62         if (!dsi->buffer) {
63             return;
64         }
65         dsi->start = dsi->buffer;
66         dsi->eof = dsi->buffer;
67         dsi->end = dsi->buffer + dsi->maxsize;
68     }
69 }
70
71 /* ---------------------- 
72    afpd is sleeping too much while trying to send something.
73    May be there's no reader or the reader is also sleeping in write,
74    look if there's some data for us to read, hopefully it will wake up
75    the reader so we can write again.
76 */
77 static int dsi_peek(DSI *dsi)
78 {
79     fd_set readfds, writefds;
80     int    len;
81     int    maxfd;
82     int    ret;
83
84     FD_ZERO(&readfds);
85     FD_ZERO(&writefds);
86     FD_SET( dsi->socket, &readfds);
87     FD_SET( dsi->socket, &writefds);
88     maxfd = dsi->socket +1;
89
90     while (1) {
91         FD_SET( dsi->socket, &readfds);
92         FD_SET( dsi->socket, &writefds);
93
94         /* No timeout: if there's nothing to read nor nothing to write,
95          * we've got nothing to do at all */
96         if ((ret = select( maxfd, &readfds, &writefds, NULL, NULL)) <= 0) {
97             if (ret == -1 && errno == EINTR)
98                 /* we might have been interrupted by out timer, so restart select */
99                 continue;
100             /* give up */
101             break;
102         }
103
104         /* Check if there's sth to read, hopefully reading that will unblock the client */
105         if (FD_ISSET(dsi->socket, &readfds)) {
106             dsi_init_buffer(dsi);
107             len = dsi->end - dsi->eof;
108
109             if (len <= 0) {
110                 /* ouch, our buffer is full ! fall back to blocking IO 
111                  * could block and disconnect but it's better than a cpu hog */
112                 break;
113             }
114
115             len = read(dsi->socket, dsi->eof, len);
116             if (len <= 0)
117                 break;
118             dsi->eof += len;
119         }
120
121         if (FD_ISSET(dsi->socket, &writefds))
122             /* we can write again at last */
123             break;
124     }
125
126     return 0;
127 }
128
129 /* ------------------------------
130  * write raw data. return actual bytes read. checks against EINTR
131  * aren't necessary if all of the signals have SA_RESTART
132  * specified. */
133 ssize_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode)
134 {
135   size_t written;
136   ssize_t len;
137   unsigned int flags = 0;
138
139   dsi->in_write++;
140   written = 0;
141
142   LOG(log_maxdebug, logtype_dsi, "dsi_stream_write: sending %u bytes", length);
143
144   /* non blocking mode */
145   if (setnonblock(dsi->socket, 1) < 0) {
146       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
147       return -1;
148   }
149
150   while (written < length) {
151       len = send(dsi->socket, (u_int8_t *) data + written, length - written, flags);
152       if (len >= 0) {
153           written += len;
154           continue;
155       }
156
157       if (errno == EINTR)
158           continue;
159
160       if (errno == EAGAIN || errno == EWOULDBLOCK) {
161           if (mode == DSI_NOWAIT && written == 0) {
162               /* DSI_NOWAIT is used by attention give up in this case. */
163               written = -1;
164               goto exit;
165           }
166
167           /* Try to read sth. in order to break up possible deadlock */
168           if (dsi_peek(dsi) != 0) {
169               written = -1;
170               goto exit;
171           }
172           /* Now try writing again */
173           continue;
174       }
175
176       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
177       written = -1;
178       goto exit;
179   }
180
181   dsi->write_count += written;
182
183 exit:
184   if (setnonblock(dsi->socket, 0) < 0) {
185       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
186       written = -1;
187   }
188
189   dsi->in_write--;
190   return written;
191 }
192
193
194 /* ---------------------------------
195 */
196 #ifdef WITH_SENDFILE
197 ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t length)
198 {
199   size_t written;
200   ssize_t len;
201
202   dsi->in_write++;
203   written = 0;
204
205   /* non blocking mode */
206   if (setnonblock(dsi->socket, 1) < 0) {
207       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
208       return -1;
209   }
210
211   while (written < length) {
212     len = sys_sendfile(dsi->socket, fromfd, &offset, length - written);
213         
214     if (len < 0) {
215       if (errno == EINTR)
216           continue;
217       if (errno == EINVAL || errno == ENOSYS)
218           return -1;
219           
220       if (errno == EAGAIN || errno == EWOULDBLOCK) {
221           if (dsi_peek(dsi)) {
222               /* can't go back to blocking mode, exit, the next read
223                  will return with an error and afpd will die.
224               */
225               break;
226           }
227           continue;
228       }
229       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
230       break;
231     }
232     else if (!len) {
233         /* afpd is going to exit */
234         errno = EIO;
235         return -1; /* I think we're at EOF here... */
236     }
237     else 
238         written += len;
239   }
240
241   if (setnonblock(dsi->socket, 0) < 0) {
242       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
243       return -1;
244   }
245
246   dsi->write_count += written;
247   dsi->in_write--;
248   return written;
249 }
250 #endif
251
252 /* 
253  * Return all bytes up to count from dsi->buffer if there are any buffered there
254  */
255 static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
256 {
257     size_t nbe = 0;
258     
259     if (dsi->start) {        
260         nbe = dsi->eof - dsi->start;
261
262         if (nbe > 0) {
263            nbe = min((size_t)nbe, count);
264            memcpy(buf, dsi->start, nbe);
265            dsi->start += nbe;
266
267            if (dsi->eof == dsi->start) 
268                dsi->start = dsi->eof = dsi->buffer;
269
270         }
271     }
272     return nbe;
273 }
274
275 /*
276  * Get bytes from buffer dsi->buffer or read from socket
277  *
278  * 1. Check if there are bytes in the the dsi->buffer buffer.
279  * 2. Return bytes from (1) if yes.
280  *    Note: this may return fewer bytes then requested in count !!
281  * 3. If the buffer was empty, read from the socket.
282  */
283 static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
284 {
285     ssize_t nbe;
286     
287     if (!count)
288         return 0;
289
290     nbe = from_buf(dsi, buf, count); /* 1. */
291     if (nbe)
292         return nbe;             /* 2. */
293   
294     return read(dsi->socket, buf, count); /* 3. */
295 }
296
297 /*
298  * Essentially a loop around buf_read() to ensure "length" bytes are read
299  * from dsi->buffer and/or the socket.
300  */
301 size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
302 {
303   size_t stored;
304   ssize_t len;
305
306   stored = 0;
307   while (stored < length) {
308     len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
309     if (len == -1 && errno == EINTR) {
310       continue;
311     } else if (len > 0) {
312       stored += len;
313     } else { /* eof or error */
314       /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
315       if (len || stored || dsi->read_count) {
316           if (! (dsi->flags & DSI_DISCONNECTED))
317               LOG(log_error, logtype_dsi, "dsi_stream_read(fd: %i): len:%d, %s",
318                   dsi->socket, len, (len < 0) ? strerror(errno) : "unexpected EOF");
319       }
320       break;
321     }
322   }
323
324   dsi->read_count += stored;
325   return stored;
326 }
327
328 /*
329  * Get "length" bytes from buffer and/or socket. In order to avoid frequent small reads
330  * this tries to read larger chunks (8192 bytes) into a buffer.
331  */
332 static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
333 {
334   size_t len;
335   size_t buflen;
336   
337   dsi_init_buffer(dsi);
338   len = from_buf(dsi, data, length); /* read from buffer dsi->buffer */
339   dsi->read_count += len;
340   if (len == length) {          /* got enough bytes from there ? */
341       return len;               /* yes */
342   }
343
344   /* fill the buffer with 8192 bytes or until buffer is full */
345   buflen = min(8192, dsi->end - dsi->eof);
346   if (buflen > 0) {
347       ssize_t ret;
348       ret = read(dsi->socket, dsi->eof, buflen);
349       if (ret > 0)
350           dsi->eof += ret;
351   }
352
353   /* now get the remaining data */
354   len += dsi_stream_read(dsi, data + len, length - len);
355   return len;
356 }
357
358 /* ---------------------------------------
359 */
360 static void block_sig(DSI *dsi)
361 {
362   dsi->in_write++;
363 }
364
365 /* ---------------------------------------
366 */
367 static void unblock_sig(DSI *dsi)
368 {
369   dsi->in_write--;
370 }
371
372 /* ---------------------------------------
373  * write data. 0 on failure. this assumes that dsi_len will never
374  * cause an overflow in the data buffer. 
375  */
376 int dsi_stream_send(DSI *dsi, void *buf, size_t length)
377 {
378   char block[DSI_BLOCKSIZ];
379 #ifdef USE_WRITEV
380   struct iovec iov[2];
381   size_t towrite;
382   ssize_t len;
383 #endif /* USE_WRITEV */
384
385   block[0] = dsi->header.dsi_flags;
386   block[1] = dsi->header.dsi_command;
387   memcpy(block + 2, &dsi->header.dsi_requestID, 
388          sizeof(dsi->header.dsi_requestID));
389   memcpy(block + 4, &dsi->header.dsi_code, sizeof(dsi->header.dsi_code));
390   memcpy(block + 8, &dsi->header.dsi_len, sizeof(dsi->header.dsi_len));
391   memcpy(block + 12, &dsi->header.dsi_reserved,
392          sizeof(dsi->header.dsi_reserved));
393
394   if (!length) { /* just write the header */
395     length = (dsi_stream_write(dsi, block, sizeof(block), 0) == sizeof(block));
396     return length; /* really 0 on failure, 1 on success */
397   }
398   
399   /* block signals */
400   block_sig(dsi);
401 #ifdef USE_WRITEV
402   iov[0].iov_base = block;
403   iov[0].iov_len = sizeof(block);
404   iov[1].iov_base = buf;
405   iov[1].iov_len = length;
406   
407   towrite = sizeof(block) + length;
408   dsi->write_count += towrite;
409   while (towrite > 0) {
410     if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || 
411         !len)
412       continue;
413     
414     if ((size_t)len == towrite) /* wrote everything out */
415       break;
416     else if (len < 0) { /* error */
417       if (errno == EAGAIN || errno == EWOULDBLOCK) {
418           if (!dsi_peek(dsi)) {
419               continue;
420           }
421       }
422       LOG(log_error, logtype_dsi, "dsi_stream_send: %s", strerror(errno));
423       unblock_sig(dsi);
424       return 0;
425     }
426     
427     towrite -= len;
428     if (towrite > length) { /* skip part of header */
429       iov[0].iov_base = (char *) iov[0].iov_base + len;
430       iov[0].iov_len -= len;
431     } else { /* skip to data */
432       if (iov[0].iov_len) {
433         len -= iov[0].iov_len;
434         iov[0].iov_len = 0;
435       }
436       iov[1].iov_base = (char *) iov[1].iov_base + len;
437       iov[1].iov_len -= len;
438     }
439   }
440   
441 #else /* USE_WRITEV */
442   /* write the header then data */
443   if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
444             (dsi_stream_write(dsi, buf, length, 0) != length)) {
445       unblock_sig(dsi);
446       return 0;
447   }
448 #endif /* USE_WRITEV */
449
450   unblock_sig(dsi);
451   return 1;
452 }
453
454
455 /* ---------------------------------------
456  * read data. function on success. 0 on failure. data length gets
457  * stored in length variable. this should really use size_t's, but
458  * that would require changes elsewhere. */
459 int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
460                        size_t *rlength)
461 {
462   char block[DSI_BLOCKSIZ];
463
464   /* read in the header */
465   if (dsi_buffered_stream_read(dsi, (u_int8_t *)block, sizeof(block)) != sizeof(block)) 
466     return 0;
467
468   dsi->header.dsi_flags = block[0];
469   dsi->header.dsi_command = block[1];
470   /* FIXME, not the right place, 
471      but we get a server disconnect without reason in the log
472   */
473   if (!block[1]) {
474       LOG(log_error, logtype_dsi, "dsi_stream_receive: invalid packet, fatal");
475       return 0;
476   }
477
478   memcpy(&dsi->header.dsi_requestID, block + 2, 
479          sizeof(dsi->header.dsi_requestID));
480   memcpy(&dsi->header.dsi_code, block + 4, sizeof(dsi->header.dsi_code));
481   memcpy(&dsi->header.dsi_len, block + 8, sizeof(dsi->header.dsi_len));
482   memcpy(&dsi->header.dsi_reserved, block + 12,
483          sizeof(dsi->header.dsi_reserved));
484   dsi->clientID = ntohs(dsi->header.dsi_requestID);
485   
486   /* make sure we don't over-write our buffers. */
487   *rlength = min(ntohl(dsi->header.dsi_len), ilength);
488   if (dsi_stream_read(dsi, buf, *rlength) != *rlength) 
489     return 0;
490
491   return block[1];
492 }