]> arthur.barton.de Git - netatalk.git/blob - libatalk/dsi/dsi_stream.c
Merge branch-2-1
[netatalk.git] / libatalk / dsi / dsi_stream.c
1 /*
2  * $Id: dsi_stream.c,v 1.20 2009-10-26 12:35:56 franklahm Exp $
3  *
4  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
5  * All rights reserved. See COPYRIGHT.
6  *
7  * this file provides the following functions:
8  * dsi_stream_write:    just write a bunch of bytes.
9  * dsi_stream_read:     just read a bunch of bytes.
10  * dsi_stream_send:     send a DSI header + data.
11  * dsi_stream_receive:  read a DSI header + data.
12  */
13
14 #ifdef HAVE_CONFIG_H
15 #include "config.h"
16 #endif /* HAVE_CONFIG_H */
17
18 #define USE_WRITEV
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include <string.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31
32 #ifdef USE_WRITEV
33 #include <sys/uio.h>
34 #endif
35
36 #include <atalk/logger.h>
37 #include <atalk/dsi.h>
38 #include <netatalk/endian.h>
39 #include <atalk/util.h>
40
41 #define min(a,b)  ((a) < (b) ? (a) : (b))
42
43 #ifndef MSG_MORE
44 #define MSG_MORE 0x8000
45 #endif
46
47 #ifndef MSG_DONTWAIT
48 #define MSG_DONTWAIT 0x40
49 #endif
50
51 /* ------------------------- 
52  * we don't use a circular buffer.
53 */
54 static void dsi_init_buffer(DSI *dsi)
55 {
56     if (!dsi->buffer) {
57         /* XXX config options */
58         dsi->maxsize = 6 * dsi->server_quantum;
59         if (!dsi->maxsize)
60             dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
61         dsi->buffer = malloc(dsi->maxsize);
62         if (!dsi->buffer) {
63             return;
64         }
65         dsi->start = dsi->buffer;
66         dsi->eof = dsi->buffer;
67         dsi->end = dsi->buffer + dsi->maxsize;
68     }
69 }
70
71 /* ---------------------- 
72    afpd is sleeping too much while trying to send something.
73    May be there's no reader or the reader is also sleeping in write,
74    look if there's some data for us to read, hopefully it will wake up
75    the reader so we can write again.
76 */
77 static int dsi_peek(DSI *dsi)
78 {
79     fd_set readfds, writefds;
80     int    len;
81     int    maxfd;
82     int    ret;
83
84     FD_ZERO(&readfds);
85     FD_ZERO(&writefds);
86     FD_SET( dsi->socket, &readfds);
87     FD_SET( dsi->socket, &writefds);
88     maxfd = dsi->socket +1;
89
90     while (1) {
91         FD_SET( dsi->socket, &readfds);
92         FD_SET( dsi->socket, &writefds);
93
94         /* No timeout: if there's nothing to read nor nothing to write,
95          * we've got nothing to do at all */
96         if ((ret = select( maxfd, &readfds, &writefds, NULL, NULL)) <= 0) {
97             if (ret == -1 && errno == EINTR)
98                 /* we might have been interrupted by out timer, so restart select */
99                 continue;
100             /* give up */
101             break;
102         }
103
104         /* Check if there's sth to read, hopefully reading that will unblock the client */
105         if (FD_ISSET(dsi->socket, &readfds)) {
106             dsi_init_buffer(dsi);
107             len = dsi->end - dsi->eof;
108
109             if (len <= 0) {
110                 /* ouch, our buffer is full ! fall back to blocking IO 
111                  * could block and disconnect but it's better than a cpu hog */
112                 break;
113             }
114
115             len = read(dsi->socket, dsi->eof, len);
116             if (len <= 0)
117                 break;
118             dsi->eof += len;
119         }
120
121         if (FD_ISSET(dsi->socket, &writefds))
122             /* we can write again at last */
123             break;
124     }
125
126     return 0;
127 }
128
129 /* ------------------------------
130  * write raw data. return actual bytes read. checks against EINTR
131  * aren't necessary if all of the signals have SA_RESTART
132  * specified. */
133 ssize_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode)
134 {
135   size_t written;
136   ssize_t len;
137   unsigned int flags = 0;
138
139   dsi->in_write++;
140   written = 0;
141
142   LOG(log_maxdebug, logtype_dsi, "dsi_stream_write: sending %u bytes", length);
143
144   /* non blocking mode */
145   if (setnonblock(dsi->socket, 1) < 0) {
146       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
147       return -1;
148   }
149
150   while (written < length) {
151       len = send(dsi->socket, (u_int8_t *) data + written, length - written, flags);
152       if (len >= 0) {
153           written += len;
154           continue;
155       }
156
157       if (errno == EINTR)
158           continue;
159
160       if (errno == EAGAIN || errno == EWOULDBLOCK) {
161           if (mode == DSI_NOWAIT && written == 0) {
162               /* DSI_NOWAIT is used by attention give up in this case. */
163               written = -1;
164               goto exit;
165           }
166
167           /* Try to read sth. in order to break up possible deadlock */
168           if (dsi_peek(dsi) != 0) {
169               written = -1;
170               goto exit;
171           }
172           /* Now try writing again */
173           continue;
174       }
175
176       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
177       written = -1;
178       goto exit;
179   }
180
181   dsi->write_count += written;
182
183 exit:
184   if (setnonblock(dsi->socket, 0) < 0) {
185       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
186       written = -1;
187   }
188
189   dsi->in_write--;
190   return written;
191 }
192
193
194 /* ---------------------------------
195 */
196 #ifdef WITH_SENDFILE
197 ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t length)
198 {
199   size_t written;
200   ssize_t len;
201
202   dsi->in_write++;
203   written = 0;
204
205   /* non blocking mode */
206   if (setnonblock(dsi->socket, 1) < 0) {
207       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
208       return -1;
209   }
210
211   while (written < length) {
212     len = sys_sendfile(dsi->socket, fromfd, &offset, length - written);
213         
214     if (len < 0) {
215       if (errno == EINTR)
216           continue;
217       if (errno == EINVAL || errno == ENOSYS)
218           return -1;
219           
220       if (errno == EAGAIN || errno == EWOULDBLOCK) {
221           if (dsi_peek(dsi)) {
222               /* can't go back to blocking mode, exit, the next read
223                  will return with an error and afpd will die.
224               */
225               break;
226           }
227           continue;
228       }
229       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
230       break;
231     }
232     else if (!len) {
233         /* afpd is going to exit */
234         errno = EIO;
235         return -1; /* I think we're at EOF here... */
236     }
237     else 
238         written += len;
239   }
240
241   if (setnonblock(dsi->socket, 0) < 0) {
242       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
243       return -1;
244   }
245
246   dsi->write_count += written;
247   dsi->in_write--;
248   return written;
249 }
250 #endif
251
252 /* 
253  * Return all bytes up to count from dsi->buffer if there are any buffered there
254  */
255 static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
256 {
257     size_t nbe = 0;
258     
259     if (dsi->start) {        
260         nbe = dsi->eof - dsi->start;
261
262         if (nbe > 0) {
263            nbe = min((size_t)nbe, count);
264            memcpy(buf, dsi->start, nbe);
265            dsi->start += nbe;
266
267            if (dsi->eof == dsi->start) 
268                dsi->start = dsi->eof = dsi->buffer;
269
270         }
271     }
272     return nbe;
273 }
274
275 /*
276  * Get bytes from buffer dsi->buffer or read from socket
277  *
278  * 1. Check if there are bytes in the the dsi->buffer buffer.
279  * 2. Return bytes from (1) if yes.
280  *    Note: this may return fewer bytes then requested in count !!
281  * 3. If the buffer was empty, read from the socket.
282  */
283 static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
284 {
285     ssize_t nbe;
286     
287     if (!count)
288         return 0;
289
290     nbe = from_buf(dsi, buf, count); /* 1. */
291     if (nbe)
292         return nbe;             /* 2. */
293   
294     return read(dsi->socket, buf, count); /* 3. */
295 }
296
297 /*
298  * Essentially a loop around buf_read() to ensure "length" bytes are read
299  * from dsi->buffer and/or the socket.
300  */
301 size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
302 {
303   size_t stored;
304   ssize_t len;
305   
306   stored = 0;
307   while (stored < length) {
308     len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
309     if (len == -1 && errno == EINTR)
310       continue;
311     else if (len > 0)
312       stored += len;
313     else { /* eof or error */
314       /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
315       if (len || stored || dsi->read_count) {
316           LOG(log_error, logtype_dsi, "dsi_stream_read(%d): %s", len, (len < 0)?strerror(errno):"unexpected EOF");
317       }
318       break;
319     }
320   }
321
322   dsi->read_count += stored;
323   return stored;
324 }
325
326 /*
327  * Get "length" bytes from buffer and/or socket. In order to avoid frequent small reads
328  * this tries to read larger chunks (8192 bytes) into a buffer.
329  */
330 static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
331 {
332   size_t len;
333   size_t buflen;
334   
335   dsi_init_buffer(dsi);
336   len = from_buf(dsi, data, length); /* read from buffer dsi->buffer */
337   dsi->read_count += len;
338   if (len == length) {          /* got enough bytes from there ? */
339       return len;               /* yes */
340   }
341
342   /* fill the buffer with 8192 bytes or until buffer is full */
343   buflen = min(8192, dsi->end - dsi->eof);
344   if (buflen > 0) {
345       ssize_t ret;
346       ret = read(dsi->socket, dsi->eof, buflen);
347       if (ret > 0)
348           dsi->eof += ret;
349   }
350
351   /* now get the remaining data */
352   len += dsi_stream_read(dsi, data + len, length - len);
353   return len;
354 }
355
356 /* ---------------------------------------
357 */
358 void dsi_sleep(DSI *dsi, const int state)
359 {
360     dsi->asleep = state;
361 }
362
363 /* ---------------------------------------
364 */
365 static void block_sig(DSI *dsi)
366 {
367   dsi->in_write++;
368 }
369
370 /* ---------------------------------------
371 */
372 static void unblock_sig(DSI *dsi)
373 {
374   dsi->in_write--;
375 }
376
377 /* ---------------------------------------
378  * write data. 0 on failure. this assumes that dsi_len will never
379  * cause an overflow in the data buffer. 
380  */
381 int dsi_stream_send(DSI *dsi, void *buf, size_t length)
382 {
383   char block[DSI_BLOCKSIZ];
384 #ifdef USE_WRITEV
385   struct iovec iov[2];
386   size_t towrite;
387   ssize_t len;
388 #endif /* USE_WRITEV */
389
390   block[0] = dsi->header.dsi_flags;
391   block[1] = dsi->header.dsi_command;
392   memcpy(block + 2, &dsi->header.dsi_requestID, 
393          sizeof(dsi->header.dsi_requestID));
394   memcpy(block + 4, &dsi->header.dsi_code, sizeof(dsi->header.dsi_code));
395   memcpy(block + 8, &dsi->header.dsi_len, sizeof(dsi->header.dsi_len));
396   memcpy(block + 12, &dsi->header.dsi_reserved,
397          sizeof(dsi->header.dsi_reserved));
398
399   if (!length) { /* just write the header */
400     length = (dsi_stream_write(dsi, block, sizeof(block), 0) == sizeof(block));
401     return length; /* really 0 on failure, 1 on success */
402   }
403   
404   /* block signals */
405   block_sig(dsi);
406 #ifdef USE_WRITEV
407   iov[0].iov_base = block;
408   iov[0].iov_len = sizeof(block);
409   iov[1].iov_base = buf;
410   iov[1].iov_len = length;
411   
412   towrite = sizeof(block) + length;
413   dsi->write_count += towrite;
414   while (towrite > 0) {
415     if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || 
416         !len)
417       continue;
418     
419     if ((size_t)len == towrite) /* wrote everything out */
420       break;
421     else if (len < 0) { /* error */
422       if (errno == EAGAIN || errno == EWOULDBLOCK) {
423           if (!dsi_peek(dsi)) {
424               continue;
425           }
426       }
427       LOG(log_error, logtype_dsi, "dsi_stream_send: %s", strerror(errno));
428       unblock_sig(dsi);
429       return 0;
430     }
431     
432     towrite -= len;
433     if (towrite > length) { /* skip part of header */
434       iov[0].iov_base = (char *) iov[0].iov_base + len;
435       iov[0].iov_len -= len;
436     } else { /* skip to data */
437       if (iov[0].iov_len) {
438         len -= iov[0].iov_len;
439         iov[0].iov_len = 0;
440       }
441       iov[1].iov_base = (char *) iov[1].iov_base + len;
442       iov[1].iov_len -= len;
443     }
444   }
445   
446 #else /* USE_WRITEV */
447   /* write the header then data */
448   if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
449             (dsi_stream_write(dsi, buf, length, 0) != length)) {
450       unblock_sig(dsi);
451       return 0;
452   }
453 #endif /* USE_WRITEV */
454
455   unblock_sig(dsi);
456   return 1;
457 }
458
459
460 /* ---------------------------------------
461  * read data. function on success. 0 on failure. data length gets
462  * stored in length variable. this should really use size_t's, but
463  * that would require changes elsewhere. */
464 int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
465                        size_t *rlength)
466 {
467   char block[DSI_BLOCKSIZ];
468
469   /* read in the header */
470   if (dsi_buffered_stream_read(dsi, (u_int8_t *)block, sizeof(block)) != sizeof(block)) 
471     return 0;
472
473   dsi->header.dsi_flags = block[0];
474   dsi->header.dsi_command = block[1];
475   /* FIXME, not the right place, 
476      but we get a server disconnect without reason in the log
477   */
478   if (!block[1]) {
479       LOG(log_error, logtype_dsi, "dsi_stream_receive: invalid packet, fatal");
480       return 0;
481   }
482
483   memcpy(&dsi->header.dsi_requestID, block + 2, 
484          sizeof(dsi->header.dsi_requestID));
485   memcpy(&dsi->header.dsi_code, block + 4, sizeof(dsi->header.dsi_code));
486   memcpy(&dsi->header.dsi_len, block + 8, sizeof(dsi->header.dsi_len));
487   memcpy(&dsi->header.dsi_reserved, block + 12,
488          sizeof(dsi->header.dsi_reserved));
489   dsi->clientID = ntohs(dsi->header.dsi_requestID);
490   
491   /* make sure we don't over-write our buffers. */
492   *rlength = min(ntohl(dsi->header.dsi_len), ilength);
493   if (dsi_stream_read(dsi, buf, *rlength) != *rlength) 
494     return 0;
495
496   return block[1];
497 }