]> arthur.barton.de Git - netatalk.git/blob - libatalk/dsi/dsi_stream.c
Merge master
[netatalk.git] / libatalk / dsi / dsi_stream.c
1 /*
2  * $Id: dsi_stream.c,v 1.20 2009-10-26 12:35:56 franklahm Exp $
3  *
4  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
5  * All rights reserved. See COPYRIGHT.
6  *
7  * this file provides the following functions:
8  * dsi_stream_write:    just write a bunch of bytes.
9  * dsi_stream_read:     just read a bunch of bytes.
10  * dsi_stream_send:     send a DSI header + data.
11  * dsi_stream_receive:  read a DSI header + data.
12  */
13
14 #ifdef HAVE_CONFIG_H
15 #include "config.h"
16 #endif /* HAVE_CONFIG_H */
17
18 #define USE_WRITEV
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include <string.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31
32 #ifdef USE_WRITEV
33 #include <sys/uio.h>
34 #endif
35
36 #include <atalk/logger.h>
37 #include <atalk/dsi.h>
38 #include <atalk/util.h>
39
40 #define min(a,b)  ((a) < (b) ? (a) : (b))
41
42 #ifndef MSG_MORE
43 #define MSG_MORE 0x8000
44 #endif
45
46 #ifndef MSG_DONTWAIT
47 #define MSG_DONTWAIT 0x40
48 #endif
49
50 /* ------------------------- 
51  * we don't use a circular buffer.
52 */
53 static void dsi_init_buffer(DSI *dsi)
54 {
55     if (!dsi->buffer) {
56         /* XXX config options */
57         dsi->maxsize = 6 * dsi->server_quantum;
58         if (!dsi->maxsize)
59             dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
60         dsi->buffer = malloc(dsi->maxsize);
61         if (!dsi->buffer) {
62             return;
63         }
64         dsi->start = dsi->buffer;
65         dsi->eof = dsi->buffer;
66         dsi->end = dsi->buffer + dsi->maxsize;
67     }
68 }
69
70 /* ---------------------- 
71    afpd is sleeping too much while trying to send something.
72    May be there's no reader or the reader is also sleeping in write,
73    look if there's some data for us to read, hopefully it will wake up
74    the reader so we can write again.
75 */
76 static int dsi_peek(DSI *dsi)
77 {
78     fd_set readfds, writefds;
79     int    len;
80     int    maxfd;
81     int    ret;
82
83     FD_ZERO(&readfds);
84     FD_ZERO(&writefds);
85     FD_SET( dsi->socket, &readfds);
86     FD_SET( dsi->socket, &writefds);
87     maxfd = dsi->socket +1;
88
89     while (1) {
90         FD_SET( dsi->socket, &readfds);
91         FD_SET( dsi->socket, &writefds);
92
93         /* No timeout: if there's nothing to read nor nothing to write,
94          * we've got nothing to do at all */
95         if ((ret = select( maxfd, &readfds, &writefds, NULL, NULL)) <= 0) {
96             if (ret == -1 && errno == EINTR)
97                 /* we might have been interrupted by out timer, so restart select */
98                 continue;
99             /* give up */
100             break;
101         }
102
103         /* Check if there's sth to read, hopefully reading that will unblock the client */
104         if (FD_ISSET(dsi->socket, &readfds)) {
105             dsi_init_buffer(dsi);
106             len = dsi->end - dsi->eof;
107
108             if (len <= 0) {
109                 /* ouch, our buffer is full ! fall back to blocking IO 
110                  * could block and disconnect but it's better than a cpu hog */
111                 break;
112             }
113
114             len = read(dsi->socket, dsi->eof, len);
115             if (len <= 0)
116                 break;
117             dsi->eof += len;
118         }
119
120         if (FD_ISSET(dsi->socket, &writefds))
121             /* we can write again at last */
122             break;
123     }
124
125     return 0;
126 }
127
128 /* ------------------------------
129  * write raw data. return actual bytes read. checks against EINTR
130  * aren't necessary if all of the signals have SA_RESTART
131  * specified. */
132 ssize_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode)
133 {
134   size_t written;
135   ssize_t len;
136   unsigned int flags = 0;
137
138   dsi->in_write++;
139   written = 0;
140
141   LOG(log_maxdebug, logtype_dsi, "dsi_stream_write: sending %u bytes", length);
142
143   /* non blocking mode */
144   if (setnonblock(dsi->socket, 1) < 0) {
145       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
146       return -1;
147   }
148
149   while (written < length) {
150       len = send(dsi->socket, (u_int8_t *) data + written, length - written, flags);
151       if (len >= 0) {
152           written += len;
153           continue;
154       }
155
156       if (errno == EINTR)
157           continue;
158
159       if (errno == EAGAIN || errno == EWOULDBLOCK) {
160           if (mode == DSI_NOWAIT && written == 0) {
161               /* DSI_NOWAIT is used by attention give up in this case. */
162               written = -1;
163               goto exit;
164           }
165
166           /* Try to read sth. in order to break up possible deadlock */
167           if (dsi_peek(dsi) != 0) {
168               written = -1;
169               goto exit;
170           }
171           /* Now try writing again */
172           continue;
173       }
174
175       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
176       written = -1;
177       goto exit;
178   }
179
180   dsi->write_count += written;
181
182 exit:
183   if (setnonblock(dsi->socket, 0) < 0) {
184       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
185       written = -1;
186   }
187
188   dsi->in_write--;
189   return written;
190 }
191
192
193 /* ---------------------------------
194 */
195 #ifdef WITH_SENDFILE
196 ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t length)
197 {
198   size_t written;
199   ssize_t len;
200
201   dsi->in_write++;
202   written = 0;
203
204   /* non blocking mode */
205   if (setnonblock(dsi->socket, 1) < 0) {
206       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
207       return -1;
208   }
209
210   while (written < length) {
211     len = sys_sendfile(dsi->socket, fromfd, &offset, length - written);
212         
213     if (len < 0) {
214       if (errno == EINTR)
215           continue;
216       if (errno == EINVAL || errno == ENOSYS)
217           return -1;
218           
219       if (errno == EAGAIN || errno == EWOULDBLOCK) {
220           if (dsi_peek(dsi)) {
221               /* can't go back to blocking mode, exit, the next read
222                  will return with an error and afpd will die.
223               */
224               break;
225           }
226           continue;
227       }
228       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
229       break;
230     }
231     else if (!len) {
232         /* afpd is going to exit */
233         errno = EIO;
234         return -1; /* I think we're at EOF here... */
235     }
236     else 
237         written += len;
238   }
239
240   if (setnonblock(dsi->socket, 0) < 0) {
241       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
242       return -1;
243   }
244
245   dsi->write_count += written;
246   dsi->in_write--;
247   return written;
248 }
249 #endif
250
251 /* 
252  * Return all bytes up to count from dsi->buffer if there are any buffered there
253  */
254 static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
255 {
256     size_t nbe = 0;
257     
258     if (dsi->start) {        
259         nbe = dsi->eof - dsi->start;
260
261         if (nbe > 0) {
262            nbe = min((size_t)nbe, count);
263            memcpy(buf, dsi->start, nbe);
264            dsi->start += nbe;
265
266            if (dsi->eof == dsi->start) 
267                dsi->start = dsi->eof = dsi->buffer;
268
269         }
270     }
271     return nbe;
272 }
273
274 /*
275  * Get bytes from buffer dsi->buffer or read from socket
276  *
277  * 1. Check if there are bytes in the the dsi->buffer buffer.
278  * 2. Return bytes from (1) if yes.
279  *    Note: this may return fewer bytes then requested in count !!
280  * 3. If the buffer was empty, read from the socket.
281  */
282 static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
283 {
284     ssize_t nbe;
285     
286     if (!count)
287         return 0;
288
289     nbe = from_buf(dsi, buf, count); /* 1. */
290     if (nbe)
291         return nbe;             /* 2. */
292   
293     return read(dsi->socket, buf, count); /* 3. */
294 }
295
296 /*
297  * Essentially a loop around buf_read() to ensure "length" bytes are read
298  * from dsi->buffer and/or the socket.
299  */
300 size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
301 {
302   size_t stored;
303   ssize_t len;
304
305   stored = 0;
306   while (stored < length) {
307     len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
308     if (len == -1 && errno == EINTR) {
309       continue;
310     } else if (len > 0) {
311       stored += len;
312     } else { /* eof or error */
313       /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
314       if (len || stored || dsi->read_count) {
315           if (! (dsi->flags & DSI_DISCONNECTED))
316               LOG(log_error, logtype_dsi, "dsi_stream_read(fd: %i): len:%d, %s",
317                   dsi->socket, len, (len < 0) ? strerror(errno) : "unexpected EOF");
318       }
319       break;
320     }
321   }
322
323   dsi->read_count += stored;
324   return stored;
325 }
326
327 /*
328  * Get "length" bytes from buffer and/or socket. In order to avoid frequent small reads
329  * this tries to read larger chunks (8192 bytes) into a buffer.
330  */
331 static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
332 {
333   size_t len;
334   size_t buflen;
335   
336   dsi_init_buffer(dsi);
337   len = from_buf(dsi, data, length); /* read from buffer dsi->buffer */
338   dsi->read_count += len;
339   if (len == length) {          /* got enough bytes from there ? */
340       return len;               /* yes */
341   }
342
343   /* fill the buffer with 8192 bytes or until buffer is full */
344   buflen = min(8192, dsi->end - dsi->eof);
345   if (buflen > 0) {
346       ssize_t ret;
347       ret = read(dsi->socket, dsi->eof, buflen);
348       if (ret > 0)
349           dsi->eof += ret;
350   }
351
352   /* now get the remaining data */
353   len += dsi_stream_read(dsi, data + len, length - len);
354   return len;
355 }
356
357 /* ---------------------------------------
358 */
359 static void block_sig(DSI *dsi)
360 {
361   dsi->in_write++;
362 }
363
364 /* ---------------------------------------
365 */
366 static void unblock_sig(DSI *dsi)
367 {
368   dsi->in_write--;
369 }
370
371 /* ---------------------------------------
372  * write data. 0 on failure. this assumes that dsi_len will never
373  * cause an overflow in the data buffer. 
374  */
375 int dsi_stream_send(DSI *dsi, void *buf, size_t length)
376 {
377   char block[DSI_BLOCKSIZ];
378 #ifdef USE_WRITEV
379   struct iovec iov[2];
380   size_t towrite;
381   ssize_t len;
382 #endif /* USE_WRITEV */
383
384   block[0] = dsi->header.dsi_flags;
385   block[1] = dsi->header.dsi_command;
386   memcpy(block + 2, &dsi->header.dsi_requestID, 
387          sizeof(dsi->header.dsi_requestID));
388   memcpy(block + 4, &dsi->header.dsi_code, sizeof(dsi->header.dsi_code));
389   memcpy(block + 8, &dsi->header.dsi_len, sizeof(dsi->header.dsi_len));
390   memcpy(block + 12, &dsi->header.dsi_reserved,
391          sizeof(dsi->header.dsi_reserved));
392
393   if (!length) { /* just write the header */
394     length = (dsi_stream_write(dsi, block, sizeof(block), 0) == sizeof(block));
395     return length; /* really 0 on failure, 1 on success */
396   }
397   
398   /* block signals */
399   block_sig(dsi);
400 #ifdef USE_WRITEV
401   iov[0].iov_base = block;
402   iov[0].iov_len = sizeof(block);
403   iov[1].iov_base = buf;
404   iov[1].iov_len = length;
405   
406   towrite = sizeof(block) + length;
407   dsi->write_count += towrite;
408   while (towrite > 0) {
409     if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || 
410         !len)
411       continue;
412     
413     if ((size_t)len == towrite) /* wrote everything out */
414       break;
415     else if (len < 0) { /* error */
416       if (errno == EAGAIN || errno == EWOULDBLOCK) {
417           if (!dsi_peek(dsi)) {
418               continue;
419           }
420       }
421       LOG(log_error, logtype_dsi, "dsi_stream_send: %s", strerror(errno));
422       unblock_sig(dsi);
423       return 0;
424     }
425     
426     towrite -= len;
427     if (towrite > length) { /* skip part of header */
428       iov[0].iov_base = (char *) iov[0].iov_base + len;
429       iov[0].iov_len -= len;
430     } else { /* skip to data */
431       if (iov[0].iov_len) {
432         len -= iov[0].iov_len;
433         iov[0].iov_len = 0;
434       }
435       iov[1].iov_base = (char *) iov[1].iov_base + len;
436       iov[1].iov_len -= len;
437     }
438   }
439   
440 #else /* USE_WRITEV */
441   /* write the header then data */
442   if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
443             (dsi_stream_write(dsi, buf, length, 0) != length)) {
444       unblock_sig(dsi);
445       return 0;
446   }
447 #endif /* USE_WRITEV */
448
449   unblock_sig(dsi);
450   return 1;
451 }
452
453
454 /* ---------------------------------------
455  * read data. function on success. 0 on failure. data length gets
456  * stored in length variable. this should really use size_t's, but
457  * that would require changes elsewhere. */
458 int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
459                        size_t *rlength)
460 {
461   char block[DSI_BLOCKSIZ];
462
463   /* read in the header */
464   if (dsi_buffered_stream_read(dsi, (u_int8_t *)block, sizeof(block)) != sizeof(block)) 
465     return 0;
466
467   dsi->header.dsi_flags = block[0];
468   dsi->header.dsi_command = block[1];
469   /* FIXME, not the right place, 
470      but we get a server disconnect without reason in the log
471   */
472   if (!block[1]) {
473       LOG(log_error, logtype_dsi, "dsi_stream_receive: invalid packet, fatal");
474       return 0;
475   }
476
477   memcpy(&dsi->header.dsi_requestID, block + 2, 
478          sizeof(dsi->header.dsi_requestID));
479   memcpy(&dsi->header.dsi_code, block + 4, sizeof(dsi->header.dsi_code));
480   memcpy(&dsi->header.dsi_len, block + 8, sizeof(dsi->header.dsi_len));
481   memcpy(&dsi->header.dsi_reserved, block + 12,
482          sizeof(dsi->header.dsi_reserved));
483   dsi->clientID = ntohs(dsi->header.dsi_requestID);
484   
485   /* make sure we don't over-write our buffers. */
486   *rlength = min(ntohl(dsi->header.dsi_len), ilength);
487   if (dsi_stream_read(dsi, buf, *rlength) != *rlength) 
488     return 0;
489
490   return block[1];
491 }