]> arthur.barton.de Git - netatalk.git/blob - libatalk/dsi/dsi_stream.c
Merge branch 'master' into branch-findercache
[netatalk.git] / libatalk / dsi / dsi_stream.c
1 /*
2  * $Id: dsi_stream.c,v 1.20 2009-10-26 12:35:56 franklahm Exp $
3  *
4  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
5  * All rights reserved. See COPYRIGHT.
6  *
7  * this file provides the following functions:
8  * dsi_stream_write:    just write a bunch of bytes.
9  * dsi_stream_read:     just read a bunch of bytes.
10  * dsi_stream_send:     send a DSI header + data.
11  * dsi_stream_receive:  read a DSI header + data.
12  */
13
14 #ifdef HAVE_CONFIG_H
15 #include "config.h"
16 #endif /* HAVE_CONFIG_H */
17
18 #define USE_WRITEV
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include <string.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31
32 #ifdef USE_WRITEV
33 #include <sys/uio.h>
34 #endif
35
36 #include <atalk/logger.h>
37 #include <atalk/dsi.h>
38 #include <netatalk/endian.h>
39 #include <atalk/util.h>
40
41 #define min(a,b)  ((a) < (b) ? (a) : (b))
42
43 #ifndef MSG_MORE
44 #define MSG_MORE 0x8000
45 #endif
46
47 #ifndef MSG_DONTWAIT
48 #define MSG_DONTWAIT 0x40
49 #endif
50
51 /* ------------------------- 
52  * we don't use a circular buffer.
53 */
54 static void dsi_init_buffer(DSI *dsi)
55 {
56     if (!dsi->buffer) {
57         /* XXX config options */
58         dsi->maxsize = 6 * dsi->server_quantum;
59         if (!dsi->maxsize)
60             dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
61         dsi->buffer = malloc(dsi->maxsize);
62         if (!dsi->buffer) {
63             return;
64         }
65         dsi->start = dsi->buffer;
66         dsi->eof = dsi->buffer;
67         dsi->end = dsi->buffer + dsi->maxsize;
68     }
69 }
70
71 /* ---------------------- 
72    afpd is sleeping too much while trying to send something.
73    May be there's no reader or the reader is also sleeping in write,
74    look if there's some data for us to read, hopefully it will wake up
75    the reader
76 */
77 static int dsi_buffer(DSI *dsi)
78 {
79     fd_set readfds, writefds;
80     int    len;
81     int    maxfd;
82
83     LOG(log_maxdebug, logtype_dsi, "dsi_buffer: switching to non-blocking IO");
84
85     /* non blocking mode */
86     if (setnonblock(dsi->socket, 1) < 0) {
87         /* can't do it! exit without error it will sleep to death below */
88         LOG(log_error, logtype_dsi, "dsi_buffer: ioctl non blocking mode %s", strerror(errno));
89         return 0;
90     }
91     
92     FD_ZERO(&readfds);
93     FD_ZERO(&writefds);
94     FD_SET( dsi->socket, &readfds);
95     FD_SET( dsi->socket, &writefds);
96     maxfd = dsi->socket +1;
97     while (1) {
98         FD_SET( dsi->socket, &readfds);
99         FD_SET( dsi->socket, &writefds);
100         if (select( maxfd, &readfds, &writefds, NULL, NULL) <= 0)
101             break;
102
103         if ( !FD_ISSET(dsi->socket, &readfds)) {
104             /* nothing waiting in the read queue */
105             break;
106         }
107         dsi_init_buffer(dsi);
108         len = dsi->end - dsi->eof;
109
110         if (len <= 0) {
111             /* ouch, our buffer is full ! 
112              * fall back to blocking IO 
113              * could block and disconnect but it's better than a cpu hog
114              */
115             break;
116         }
117
118         len = read(dsi->socket, dsi->eof, len);
119         if (len <= 0)
120             break;
121         dsi->eof += len;
122         if ( FD_ISSET(dsi->socket, &writefds)) {
123             /* we can write again at last */
124             break;
125         }
126     }
127
128     LOG(log_maxdebug, logtype_dsi, "dsi_buffer: switching back to blocking IO");
129
130     if (setnonblock(dsi->socket, 0) < 0) {
131         /* can't do it! afpd will fail very quickly */
132         LOG(log_error, logtype_dsi, "dsi_buffer: ioctl blocking mode %s", strerror(errno));
133         return -1;
134     }
135     return 0;
136 }
137
138 /* ------------------------------
139  * write raw data. return actual bytes read. checks against EINTR
140  * aren't necessary if all of the signals have SA_RESTART
141  * specified. */
142 ssize_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode)
143 {
144   size_t written;
145   ssize_t len;
146 #if 0
147   /* FIXME sometime it's slower */
148   unsigned int flags = (mode)?MSG_MORE:0;
149 #endif
150   unsigned int flags = 0;
151
152 #if 0
153   /* XXX there's no MSG_DONTWAIT in recv ?? so we have to play with ioctl
154   */ 
155   flags |= MSG_DONTWAIT;
156 #endif
157   
158   dsi->in_write++;
159   written = 0;
160
161   LOG(log_maxdebug, logtype_dsi, "dsi_stream_write: sending %u bytes", length);
162
163   while (written < length) {
164       len = send(dsi->socket, (u_int8_t *) data + written, length - written, flags);
165       if ((len == 0) || (len == -1 && errno == EINTR))
166           continue;
167
168     if (len < 0) {
169       if (errno == EAGAIN || errno == EWOULDBLOCK) {
170           if (mode == DSI_NOWAIT && written == 0) {
171               /* DSI_NOWAIT is used by attention give up in this case. */
172               return -1;
173           }
174           if (dsi_buffer(dsi)) {
175               /* can't go back to blocking mode, exit, the next read
176                  will return with an error and afpd will die.
177               */
178               break;
179           }
180           continue;
181       }
182       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
183       break;
184     }
185     else {
186         written += len;
187     }
188   }
189
190   dsi->write_count += written;
191   dsi->in_write--;
192   return written;
193 }
194
195
196 /* ---------------------------------
197 */
198 #ifdef WITH_SENDFILE
199 ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t length)
200 {
201   size_t written;
202   ssize_t len;
203
204   dsi->in_write++;
205   written = 0;
206
207   while (written < length) {
208     len = sys_sendfile(dsi->socket, fromfd, &offset, length - written);
209         
210     if (len < 0) {
211       if (errno == EINTR)
212           continue;
213       if (errno == EINVAL || errno == ENOSYS)
214           return -1;
215           
216       if (errno == EAGAIN || errno == EWOULDBLOCK) {
217           if (dsi_buffer(dsi)) {
218               /* can't go back to blocking mode, exit, the next read
219                  will return with an error and afpd will die.
220               */
221               break;
222           }
223           continue;
224       }
225       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
226       break;
227     }
228     else if (!len) {
229         /* afpd is going to exit */
230         errno = EIO;
231         return -1; /* I think we're at EOF here... */
232     }
233     else 
234         written += len;
235   }
236
237   dsi->write_count += written;
238   dsi->in_write--;
239   return written;
240 }
241 #endif
242
243 /* 
244  * Return all bytes up to count from dsi->buffer if there are any buffered there
245  */
246 static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
247 {
248     size_t nbe = 0;
249     
250     if (dsi->start) {        
251         nbe = dsi->eof - dsi->start;
252
253         if (nbe > 0) {
254            nbe = min((size_t)nbe, count);
255            memcpy(buf, dsi->start, nbe);
256            dsi->start += nbe;
257
258            if (dsi->eof == dsi->start) 
259                dsi->start = dsi->eof = dsi->buffer;
260
261         }
262     }
263     return nbe;
264 }
265
266 /*
267  * Get bytes from buffer dsi->buffer or read from socket
268  *
269  * 1. Check if there are bytes in the the dsi->buffer buffer.
270  * 2. Return bytes from (1) if yes.
271  *    Note: this may return fewer bytes then requested in count !!
272  * 3. If the buffer was empty, read from the socket.
273  */
274 static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
275 {
276     ssize_t nbe;
277     
278     if (!count)
279         return 0;
280
281     nbe = from_buf(dsi, buf, count); /* 1. */
282     if (nbe)
283         return nbe;             /* 2. */
284   
285     return read(dsi->socket, buf, count); /* 3. */
286 }
287
288 /*
289  * Essentially a loop around buf_read() to ensure "length" bytes are read
290  * from dsi->buffer and/or the socket.
291  */
292 size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
293 {
294   size_t stored;
295   ssize_t len;
296   
297   stored = 0;
298   while (stored < length) {
299     len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
300     if (len == -1 && errno == EINTR)
301       continue;
302     else if (len > 0)
303       stored += len;
304     else { /* eof or error */
305       /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
306       if (len || stored || dsi->read_count) {
307           LOG(log_error, logtype_dsi, "dsi_stream_read(%d): %s", len, (len < 0)?strerror(errno):"unexpected EOF");
308       }
309       break;
310     }
311   }
312
313   dsi->read_count += stored;
314   return stored;
315 }
316
317 /*
318  * Get "length" bytes from buffer and/or socket. In order to avoid frequent small reads
319  * this tries to read larger chunks (8192 bytes) into a buffer.
320  */
321 static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
322 {
323   size_t len;
324   size_t buflen;
325   
326   dsi_init_buffer(dsi);
327   len = from_buf(dsi, data, length); /* read from buffer dsi->buffer */
328   dsi->read_count += len;
329   if (len == length) {          /* got enough bytes from there ? */
330       return len;               /* yes */
331   }
332
333   /* fill the buffer with 8192 bytes or until buffer is full */
334   buflen = min(8192, dsi->end - dsi->eof);
335   if (buflen > 0) {
336       ssize_t ret;
337       ret = read(dsi->socket, dsi->eof, buflen);
338       if (ret > 0)
339           dsi->eof += ret;
340   }
341
342   /* now get the remaining data */
343   len += dsi_stream_read(dsi, data + len, length - len);
344   return len;
345 }
346
347 /* ---------------------------------------
348 */
349 void dsi_sleep(DSI *dsi, const int state)
350 {
351     dsi->asleep = state;
352 }
353
354 /* ---------------------------------------
355 */
356 static void block_sig(DSI *dsi)
357 {
358   dsi->in_write++;
359 }
360
361 /* ---------------------------------------
362 */
363 static void unblock_sig(DSI *dsi)
364 {
365   dsi->in_write--;
366 }
367
368 /* ---------------------------------------
369  * write data. 0 on failure. this assumes that dsi_len will never
370  * cause an overflow in the data buffer. 
371  */
372 int dsi_stream_send(DSI *dsi, void *buf, size_t length)
373 {
374   char block[DSI_BLOCKSIZ];
375 #ifdef USE_WRITEV
376   struct iovec iov[2];
377   size_t towrite;
378   ssize_t len;
379 #endif /* USE_WRITEV */
380
381   block[0] = dsi->header.dsi_flags;
382   block[1] = dsi->header.dsi_command;
383   memcpy(block + 2, &dsi->header.dsi_requestID, 
384          sizeof(dsi->header.dsi_requestID));
385   memcpy(block + 4, &dsi->header.dsi_code, sizeof(dsi->header.dsi_code));
386   memcpy(block + 8, &dsi->header.dsi_len, sizeof(dsi->header.dsi_len));
387   memcpy(block + 12, &dsi->header.dsi_reserved,
388          sizeof(dsi->header.dsi_reserved));
389
390   if (!length) { /* just write the header */
391     length = (dsi_stream_write(dsi, block, sizeof(block), 0) == sizeof(block));
392     return length; /* really 0 on failure, 1 on success */
393   }
394   
395   /* block signals */
396   block_sig(dsi);
397 #ifdef USE_WRITEV
398   iov[0].iov_base = block;
399   iov[0].iov_len = sizeof(block);
400   iov[1].iov_base = buf;
401   iov[1].iov_len = length;
402   
403   towrite = sizeof(block) + length;
404   dsi->write_count += towrite;
405   while (towrite > 0) {
406     if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || 
407         !len)
408       continue;
409     
410     if ((size_t)len == towrite) /* wrote everything out */
411       break;
412     else if (len < 0) { /* error */
413       if (errno == EAGAIN || errno == EWOULDBLOCK) {
414           if (!dsi_buffer(dsi)) {
415               continue;
416           }
417       }
418       LOG(log_error, logtype_dsi, "dsi_stream_send: %s", strerror(errno));
419       unblock_sig(dsi);
420       return 0;
421     }
422     
423     towrite -= len;
424     if (towrite > length) { /* skip part of header */
425       iov[0].iov_base = (char *) iov[0].iov_base + len;
426       iov[0].iov_len -= len;
427     } else { /* skip to data */
428       if (iov[0].iov_len) {
429         len -= iov[0].iov_len;
430         iov[0].iov_len = 0;
431       }
432       iov[1].iov_base = (char *) iov[1].iov_base + len;
433       iov[1].iov_len -= len;
434     }
435   }
436   
437 #else /* USE_WRITEV */
438   /* write the header then data */
439   if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
440             (dsi_stream_write(dsi, buf, length, 0) != length)) {
441       unblock_sig(dsi);
442       return 0;
443   }
444 #endif /* USE_WRITEV */
445
446   unblock_sig(dsi);
447   return 1;
448 }
449
450
451 /* ---------------------------------------
452  * read data. function on success. 0 on failure. data length gets
453  * stored in length variable. this should really use size_t's, but
454  * that would require changes elsewhere. */
455 int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
456                        size_t *rlength)
457 {
458   char block[DSI_BLOCKSIZ];
459
460   /* read in the header */
461   if (dsi_buffered_stream_read(dsi, (u_int8_t *)block, sizeof(block)) != sizeof(block)) 
462     return 0;
463
464   dsi->header.dsi_flags = block[0];
465   dsi->header.dsi_command = block[1];
466   /* FIXME, not the right place, 
467      but we get a server disconnect without reason in the log
468   */
469   if (!block[1]) {
470       LOG(log_error, logtype_dsi, "dsi_stream_receive: invalid packet, fatal");
471       return 0;
472   }
473
474   memcpy(&dsi->header.dsi_requestID, block + 2, 
475          sizeof(dsi->header.dsi_requestID));
476   memcpy(&dsi->header.dsi_code, block + 4, sizeof(dsi->header.dsi_code));
477   memcpy(&dsi->header.dsi_len, block + 8, sizeof(dsi->header.dsi_len));
478   memcpy(&dsi->header.dsi_reserved, block + 12,
479          sizeof(dsi->header.dsi_reserved));
480   dsi->clientID = ntohs(dsi->header.dsi_requestID);
481   
482   /* make sure we don't over-write our buffers. */
483   *rlength = min(ntohl(dsi->header.dsi_len), ilength);
484   if (dsi_stream_read(dsi, buf, *rlength) != *rlength) 
485     return 0;
486
487   return block[1];
488 }