]> arthur.barton.de Git - netatalk.git/blob - libatalk/dsi/dsi_stream.c
Fix logtype
[netatalk.git] / libatalk / dsi / dsi_stream.c
1 /*
2  * $Id: dsi_stream.c,v 1.20 2009-10-26 12:35:56 franklahm Exp $
3  *
4  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
5  * All rights reserved. See COPYRIGHT.
6  *
7  * this file provides the following functions:
8  * dsi_stream_write:    just write a bunch of bytes.
9  * dsi_stream_read:     just read a bunch of bytes.
10  * dsi_stream_send:     send a DSI header + data.
11  * dsi_stream_receive:  read a DSI header + data.
12  */
13
14 #ifdef HAVE_CONFIG_H
15 #include "config.h"
16 #endif /* HAVE_CONFIG_H */
17
18 #define USE_WRITEV
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include <string.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31
32 #ifdef USE_WRITEV
33 #include <sys/uio.h>
34 #endif
35
36 #include <atalk/logger.h>
37 #include <atalk/dsi.h>
38 #include <netatalk/endian.h>
39 #include <atalk/util.h>
40
41 #define min(a,b)  ((a) < (b) ? (a) : (b))
42
43 #ifndef MSG_MORE
44 #define MSG_MORE 0x8000
45 #endif
46
47 #ifndef MSG_DONTWAIT
48 #define MSG_DONTWAIT 0x40
49 #endif
50
51 /* ------------------------- 
52  * we don't use a circular buffer.
53 */
54 static void dsi_init_buffer(DSI *dsi)
55 {
56     if (!dsi->buffer) {
57         /* XXX config options */
58         dsi->maxsize = 6 * dsi->server_quantum;
59         if (!dsi->maxsize)
60             dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
61         dsi->buffer = malloc(dsi->maxsize);
62         if (!dsi->buffer) {
63             return;
64         }
65         dsi->start = dsi->buffer;
66         dsi->eof = dsi->buffer;
67         dsi->end = dsi->buffer + dsi->maxsize;
68     }
69 }
70
71 /* ---------------------- 
72    afpd is sleeping too much while trying to send something.
73    May be there's no reader or the reader is also sleeping in write,
74    look if there's some data for us to read, hopefully it will wake up
75    the reader so we can write again.
76 */
77 static int dsi_peek(DSI *dsi)
78 {
79     fd_set readfds, writefds;
80     int    len;
81     int    maxfd;
82     int    ret;
83
84     FD_ZERO(&readfds);
85     FD_ZERO(&writefds);
86     FD_SET( dsi->socket, &readfds);
87     FD_SET( dsi->socket, &writefds);
88     maxfd = dsi->socket +1;
89
90     while (1) {
91         FD_SET( dsi->socket, &readfds);
92         FD_SET( dsi->socket, &writefds);
93
94         /* No timeout: if there's nothing to read nor nothing to write,
95          * we've got nothing to do at all */
96         if ((ret = select( maxfd, &readfds, &writefds, NULL, NULL)) <= 0) {
97             if (ret == -1 && errno == EINTR)
98                 /* we might have been interrupted by out timer, so restart select */
99                 continue;
100             /* give up */
101             LOG(log_error, logtype_dsi, "dsi_peek: unexpected select return: %d %s",
102                 ret, ret < 0 ? strerror(errno) : "");
103             setnonblock(dsi->socket, 0);
104             break;
105         }
106
107         /* Check if there's sth to read, hopefully reading that will unblock the client */
108         if (FD_ISSET(dsi->socket, &readfds)) {
109             dsi_init_buffer(dsi);
110             len = dsi->end - dsi->eof;
111
112             if (len <= 0) {
113                 /* ouch, our buffer is full ! fall back to blocking IO 
114                  * could block and disconnect but it's better than a cpu hog */
115                 LOG(log_error, logtype_dsi, "dsi_peek: read buffer is full");
116                 setnonblock(dsi->socket, 0);
117                 break;
118             }
119
120             if ((len = read(dsi->socket, dsi->eof, len)) <= 0) {
121                 LOG(log_error, logtype_dsi, "dsi_peek: read: %d %s",
122                     len, len < 0 ? strerror(errno) : "");
123                 break;
124             }
125
126             dsi->eof += len;
127             continue;
128         }
129
130         if (FD_ISSET(dsi->socket, &writefds))
131             /* we can write again at last */
132             LOG(log_error, logtype_dsi, "dsi_peek: can write again");
133             break;
134     }
135
136     return 0;
137 }
138
139 /* ------------------------------
140  * write raw data. return actual bytes read. checks against EINTR
141  * aren't necessary if all of the signals have SA_RESTART
142  * specified. */
143 ssize_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode)
144 {
145   size_t written;
146   ssize_t len;
147   unsigned int flags = 0;
148
149   dsi->in_write++;
150   written = 0;
151
152   LOG(log_maxdebug, logtype_dsi, "dsi_stream_write: sending %u bytes", length);
153
154   /* non blocking mode */
155   if (setnonblock(dsi->socket, 1) < 0) {
156       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
157       return -1;
158   }
159
160   while (written < length) {
161       len = send(dsi->socket, (u_int8_t *) data + written, length - written, flags);
162       if (len >= 0) {
163           written += len;
164           continue;
165       }
166
167       if (errno == EINTR)
168           continue;
169
170       if (errno == EAGAIN || errno == EWOULDBLOCK) {
171           if (mode == DSI_NOWAIT && written == 0) {
172               /* DSI_NOWAIT is used by attention give up in this case. */
173               written = -1;
174               goto exit;
175           }
176
177           /* Try to read sth. in order to break up possible deadlock */
178           if (dsi_peek(dsi) != 0) {
179               written = -1;
180               goto exit;
181           }
182           /* Now try writing again */
183           continue;
184       }
185
186       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
187       written = -1;
188       goto exit;
189   }
190
191   dsi->write_count += written;
192
193 exit:
194   if (setnonblock(dsi->socket, 0) < 0) {
195       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
196       written = -1;
197   }
198
199   dsi->in_write--;
200   return written;
201 }
202
203
204 /* ---------------------------------
205 */
206 #ifdef WITH_SENDFILE
207 ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t length)
208 {
209   size_t written;
210   ssize_t len;
211
212   dsi->in_write++;
213   written = 0;
214
215   /* non blocking mode */
216   if (setnonblock(dsi->socket, 1) < 0) {
217       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
218       return -1;
219   }
220
221   while (written < length) {
222     len = sys_sendfile(dsi->socket, fromfd, &offset, length - written);
223         
224     if (len < 0) {
225       if (errno == EINTR)
226           continue;
227       if (errno == EINVAL || errno == ENOSYS)
228           return -1;
229           
230       if (errno == EAGAIN || errno == EWOULDBLOCK) {
231           if (dsi_peek(dsi)) {
232               /* can't go back to blocking mode, exit, the next read
233                  will return with an error and afpd will die.
234               */
235               break;
236           }
237           continue;
238       }
239       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
240       break;
241     }
242     else if (!len) {
243         /* afpd is going to exit */
244         errno = EIO;
245         return -1; /* I think we're at EOF here... */
246     }
247     else 
248         written += len;
249   }
250
251   if (setnonblock(dsi->socket, 0) < 0) {
252       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
253       return -1;
254   }
255
256   dsi->write_count += written;
257   dsi->in_write--;
258   return written;
259 }
260 #endif
261
262 /* 
263  * Return all bytes up to count from dsi->buffer if there are any buffered there
264  */
265 static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
266 {
267     size_t nbe = 0;
268     
269     if (dsi->start) {        
270         nbe = dsi->eof - dsi->start;
271
272         if (nbe > 0) {
273            nbe = min((size_t)nbe, count);
274            memcpy(buf, dsi->start, nbe);
275            dsi->start += nbe;
276
277            if (dsi->eof == dsi->start) 
278                dsi->start = dsi->eof = dsi->buffer;
279
280         }
281     }
282     return nbe;
283 }
284
285 /*
286  * Get bytes from buffer dsi->buffer or read from socket
287  *
288  * 1. Check if there are bytes in the the dsi->buffer buffer.
289  * 2. Return bytes from (1) if yes.
290  *    Note: this may return fewer bytes then requested in count !!
291  * 3. If the buffer was empty, read from the socket.
292  */
293 static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
294 {
295     ssize_t nbe;
296     
297     if (!count)
298         return 0;
299
300     nbe = from_buf(dsi, buf, count); /* 1. */
301     if (nbe)
302         return nbe;             /* 2. */
303   
304     return read(dsi->socket, buf, count); /* 3. */
305 }
306
307 /*
308  * Essentially a loop around buf_read() to ensure "length" bytes are read
309  * from dsi->buffer and/or the socket.
310  */
311 size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
312 {
313   size_t stored;
314   ssize_t len;
315
316   stored = 0;
317   while (stored < length) {
318     len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
319     if (len == -1 && errno == EINTR) {
320       continue;
321     } else if (len > 0) {
322       stored += len;
323     } else { /* eof or error */
324       /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
325       if (len || stored || dsi->read_count) {
326           if (! (dsi->flags & DSI_DISCONNECTED))
327               LOG(log_error, logtype_dsi, "dsi_stream_read(fd: %i): len:%d, %s",
328                   dsi->socket, len, (len < 0) ? strerror(errno) : "unexpected EOF");
329       }
330       break;
331     }
332   }
333
334   dsi->read_count += stored;
335   return stored;
336 }
337
338 /*
339  * Get "length" bytes from buffer and/or socket. In order to avoid frequent small reads
340  * this tries to read larger chunks (8192 bytes) into a buffer.
341  */
342 static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
343 {
344   size_t len;
345   size_t buflen;
346   
347   dsi_init_buffer(dsi);
348   len = from_buf(dsi, data, length); /* read from buffer dsi->buffer */
349   dsi->read_count += len;
350   if (len == length) {          /* got enough bytes from there ? */
351       return len;               /* yes */
352   }
353
354   /* fill the buffer with 8192 bytes or until buffer is full */
355   buflen = min(8192, dsi->end - dsi->eof);
356   if (buflen > 0) {
357       ssize_t ret;
358       ret = read(dsi->socket, dsi->eof, buflen);
359       if (ret > 0)
360           dsi->eof += ret;
361   }
362
363   /* now get the remaining data */
364   len += dsi_stream_read(dsi, data + len, length - len);
365   return len;
366 }
367
368 /* ---------------------------------------
369 */
370 static void block_sig(DSI *dsi)
371 {
372   dsi->in_write++;
373 }
374
375 /* ---------------------------------------
376 */
377 static void unblock_sig(DSI *dsi)
378 {
379   dsi->in_write--;
380 }
381
382 /* ---------------------------------------
383  * write data. 0 on failure. this assumes that dsi_len will never
384  * cause an overflow in the data buffer. 
385  */
386 int dsi_stream_send(DSI *dsi, void *buf, size_t length)
387 {
388   char block[DSI_BLOCKSIZ];
389 #ifdef USE_WRITEV
390   struct iovec iov[2];
391   size_t towrite;
392   ssize_t len;
393 #endif /* USE_WRITEV */
394
395   block[0] = dsi->header.dsi_flags;
396   block[1] = dsi->header.dsi_command;
397   memcpy(block + 2, &dsi->header.dsi_requestID, 
398          sizeof(dsi->header.dsi_requestID));
399   memcpy(block + 4, &dsi->header.dsi_code, sizeof(dsi->header.dsi_code));
400   memcpy(block + 8, &dsi->header.dsi_len, sizeof(dsi->header.dsi_len));
401   memcpy(block + 12, &dsi->header.dsi_reserved,
402          sizeof(dsi->header.dsi_reserved));
403
404   if (!length) { /* just write the header */
405     length = (dsi_stream_write(dsi, block, sizeof(block), 0) == sizeof(block));
406     return length; /* really 0 on failure, 1 on success */
407   }
408   
409   /* block signals */
410   block_sig(dsi);
411 #ifdef USE_WRITEV
412   iov[0].iov_base = block;
413   iov[0].iov_len = sizeof(block);
414   iov[1].iov_base = buf;
415   iov[1].iov_len = length;
416   
417   towrite = sizeof(block) + length;
418   dsi->write_count += towrite;
419   while (towrite > 0) {
420     if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || 
421         !len)
422       continue;
423     
424     if ((size_t)len == towrite) /* wrote everything out */
425       break;
426     else if (len < 0) { /* error */
427       if (errno == EAGAIN || errno == EWOULDBLOCK) {
428           if (!dsi_peek(dsi)) {
429               continue;
430           }
431       }
432       LOG(log_error, logtype_dsi, "dsi_stream_send: %s", strerror(errno));
433       unblock_sig(dsi);
434       return 0;
435     }
436     
437     towrite -= len;
438     if (towrite > length) { /* skip part of header */
439       iov[0].iov_base = (char *) iov[0].iov_base + len;
440       iov[0].iov_len -= len;
441     } else { /* skip to data */
442       if (iov[0].iov_len) {
443         len -= iov[0].iov_len;
444         iov[0].iov_len = 0;
445       }
446       iov[1].iov_base = (char *) iov[1].iov_base + len;
447       iov[1].iov_len -= len;
448     }
449   }
450   
451 #else /* USE_WRITEV */
452   /* write the header then data */
453   if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
454             (dsi_stream_write(dsi, buf, length, 0) != length)) {
455       unblock_sig(dsi);
456       return 0;
457   }
458 #endif /* USE_WRITEV */
459
460   unblock_sig(dsi);
461   return 1;
462 }
463
464
465 /* ---------------------------------------
466  * read data. function on success. 0 on failure. data length gets
467  * stored in length variable. this should really use size_t's, but
468  * that would require changes elsewhere. */
469 int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
470                        size_t *rlength)
471 {
472   char block[DSI_BLOCKSIZ];
473
474   /* read in the header */
475   if (dsi_buffered_stream_read(dsi, (u_int8_t *)block, sizeof(block)) != sizeof(block)) 
476     return 0;
477
478   dsi->header.dsi_flags = block[0];
479   dsi->header.dsi_command = block[1];
480   /* FIXME, not the right place, 
481      but we get a server disconnect without reason in the log
482   */
483   if (!block[1]) {
484       LOG(log_error, logtype_dsi, "dsi_stream_receive: invalid packet, fatal");
485       return 0;
486   }
487
488   memcpy(&dsi->header.dsi_requestID, block + 2, 
489          sizeof(dsi->header.dsi_requestID));
490   memcpy(&dsi->header.dsi_code, block + 4, sizeof(dsi->header.dsi_code));
491   memcpy(&dsi->header.dsi_len, block + 8, sizeof(dsi->header.dsi_len));
492   memcpy(&dsi->header.dsi_reserved, block + 12,
493          sizeof(dsi->header.dsi_reserved));
494   dsi->clientID = ntohs(dsi->header.dsi_requestID);
495   
496   /* make sure we don't over-write our buffers. */
497   *rlength = min(ntohl(dsi->header.dsi_len), ilength);
498   if (dsi_stream_read(dsi, buf, *rlength) != *rlength) 
499     return 0;
500
501   return block[1];
502 }