]> arthur.barton.de Git - netatalk.git/blob - libatalk/dsi/dsi_stream.c
on linux put sendfile back
[netatalk.git] / libatalk / dsi / dsi_stream.c
1 /*
2  * $Id: dsi_stream.c,v 1.18 2009-10-25 09:47:05 didg Exp $
3  *
4  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
5  * All rights reserved. See COPYRIGHT.
6  *
7  * this file provides the following functions:
8  * dsi_stream_write:    just write a bunch of bytes.
9  * dsi_stream_read:     just read a bunch of bytes.
10  * dsi_stream_send:     send a DSI header + data.
11  * dsi_stream_receive:  read a DSI header + data.
12  */
13
14 #ifdef HAVE_CONFIG_H
15 #include "config.h"
16 #endif /* HAVE_CONFIG_H */
17
18 #define USE_WRITEV
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include <string.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31
32 #ifdef USE_WRITEV
33 #include <sys/uio.h>
34 #endif
35
36 #include <atalk/logger.h>
37
38 #include <atalk/dsi.h>
39 #include <netatalk/endian.h>
40 #include <atalk/util.h>
41 #include <sys/ioctl.h> 
42
43 #define min(a,b)  ((a) < (b) ? (a) : (b))
44
45 #ifndef MSG_MORE
46 #define MSG_MORE 0x8000
47 #endif
48
49 #ifndef MSG_DONTWAIT
50 #define MSG_DONTWAIT 0x40
51 #endif
52
53 /* ------------------------- 
54  * we don't use a circular buffer.
55 */
56 static void dsi_init_buffer(DSI *dsi)
57 {
58     if (!dsi->buffer) {
59         /* XXX config options */
60         dsi->maxsize = 6 * dsi->server_quantum;
61         if (!dsi->maxsize)
62             dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
63         dsi->buffer = malloc(dsi->maxsize);
64         if (!dsi->buffer) {
65             return;
66         }
67         dsi->start = dsi->buffer;
68         dsi->eof = dsi->buffer;
69         dsi->end = dsi->buffer + dsi->maxsize;
70     }
71 }
72
73 /* ---------------------- 
74    afpd is sleeping too much while trying to send something.
75    May be there's no reader or the reader is also sleeping in write,
76    look if there's some data for us to read, hopefully it will wake up
77    the reader
78 */
79 static int dsi_buffer(DSI *dsi)
80 {
81     fd_set readfds, writefds;
82     int    len;
83     int    maxfd;
84     int adr;
85
86     /* non blocking mode */
87     adr = 1;
88     if (ioctl(dsi->socket, FIONBIO, &adr) < 0) {
89         /* can't do it! exit without error it will sleep to death below */
90         LOG(log_error, logtype_default, "dsi_buffer: ioctl non blocking mode %s", strerror(errno));
91         return 0;
92     }
93     
94     FD_ZERO(&readfds);
95     FD_ZERO(&writefds);
96     FD_SET( dsi->socket, &readfds);
97     FD_SET( dsi->socket, &writefds);
98     maxfd = dsi->socket +1;
99     while (1) {
100         FD_SET( dsi->socket, &readfds);
101         FD_SET( dsi->socket, &writefds);
102         if (select( maxfd, &readfds, &writefds, NULL, NULL) <= 0)
103             break;
104
105         if ( !FD_ISSET(dsi->socket, &readfds)) {
106             /* nothing waiting in the read queue */
107             break;
108         }
109         dsi_init_buffer(dsi);
110         len = dsi->end - dsi->eof;
111
112         if (len <= 0) {
113             /* ouch, our buffer is full ! 
114              * fall back to blocking IO 
115              * could block and disconnect but it's better than a cpu hog
116              */
117             break;
118         }
119
120         len = read(dsi->socket, dsi->eof, len);
121         if (len <= 0)
122             break;
123         dsi->eof += len;
124         if ( FD_ISSET(dsi->socket, &writefds)) {
125             /* we can write again at last */
126             break;
127         }
128     }
129     adr = 0;
130     if (ioctl(dsi->socket, FIONBIO, &adr) < 0) {
131         /* can't do it! afpd will fail very quickly */
132         LOG(log_error, logtype_default, "dsi_buffer: ioctl blocking mode %s", strerror(errno));
133         return -1;
134     }
135     return 0;
136 }
137
138 /* ------------------------------
139  * write raw data. return actual bytes read. checks against EINTR
140  * aren't necessary if all of the signals have SA_RESTART
141  * specified. */
142 ssize_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode)
143 {
144   size_t written;
145   ssize_t len;
146 #if 0
147   /* FIXME sometime it's slower */
148   unsigned int flags = (mode)?MSG_MORE:0;
149 #endif
150   unsigned int flags = 0;
151
152 #if 0
153   /* XXX there's no MSG_DONTWAIT in recv ?? so we have to play with ioctl
154   */ 
155   flags |= MSG_DONTWAIT;
156 #endif
157   
158   dsi->in_write++;
159   written = 0;
160   while (written < length) {
161     if ((-1 == (len = send(dsi->socket, (u_int8_t *) data + written,
162                       length - written, flags)) && errno == EINTR) ||
163         !len)
164       continue;
165
166     if (len < 0) {
167       if (errno == EAGAIN || errno == EWOULDBLOCK) {
168           if (mode == DSI_NOWAIT && written == 0) {
169               /* DSI_NOWAIT is used by attention
170                  give up in this case.
171               */
172               return -1;
173           }
174           if (dsi_buffer(dsi)) {
175               /* can't go back to blocking mode, exit, the next read
176                  will return with an error and afpd will die.
177               */
178               break;
179           }
180           continue;
181       }
182       LOG(log_error, logtype_default, "dsi_stream_write: %s", strerror(errno));
183       break;
184     }
185     else {
186         written += len;
187     }
188   }
189
190   dsi->write_count += written;
191   dsi->in_write--;
192   return written;
193 }
194
195
196 /* ---------------------------------
197 */
198 ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t length)
199 {
200   size_t written;
201   ssize_t len;
202
203   dsi->in_write++;
204   written = 0;
205
206   while (written < length) {
207     len = sys_sendfile(dsi->socket, fromfd, &offset, length - written);
208         
209     if (len < 0) {
210       if (errno == EINTR)
211           continue;
212       if (errno == EINVAL || errno == ENOSYS)
213           return -1;
214           
215       if (errno == EAGAIN || errno == EWOULDBLOCK) {
216           if (dsi_buffer(dsi)) {
217               /* can't go back to blocking mode, exit, the next read
218                  will return with an error and afpd will die.
219               */
220               break;
221           }
222           continue;
223       }
224       LOG(log_error, logtype_default, "dsi_stream_write: %s", strerror(errno));
225       break;
226     }
227     else if (!len) {
228         /* afpd is going to exit */
229         errno = EIO;
230         return -1; /* I think we're at EOF here... */
231     }
232     else 
233         written += len;
234   }
235
236   dsi->write_count += written;
237   dsi->in_write--;
238   return written;
239 }
240
241 /* ---------------------------------
242 */
243 static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
244 {
245     size_t nbe = 0;
246     
247     if (dsi->start) {        
248         nbe = dsi->eof - dsi->start;
249
250         if (nbe > 0) {
251            nbe = min((size_t)nbe, count);
252            memcpy(buf, dsi->start, nbe);
253            dsi->start += nbe;
254
255            if (dsi->eof == dsi->start) 
256                dsi->start = dsi->eof = dsi->buffer;
257
258         }
259     }
260     return nbe;
261 }
262
263 static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
264 {
265     ssize_t nbe;
266     
267     if (!count)
268         return 0;
269
270     nbe = from_buf(dsi, buf, count);
271     if (nbe)
272         return nbe;
273   
274     return read(dsi->socket, buf, count);
275
276 }
277
278 /* ---------------------------------------
279  * read raw data. return actual bytes read. this will wait until 
280  * it gets length bytes 
281  */
282 size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
283 {
284   size_t stored;
285   ssize_t len;
286   
287   stored = 0;
288   while (stored < length) {
289     len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
290     if (len == -1 && errno == EINTR)
291       continue;
292     else if (len > 0)
293       stored += len;
294     else { /* eof or error */
295       /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
296       if (len || stored || dsi->read_count) {
297           LOG(log_error, logtype_default, "dsi_stream_read(%d): %s", len, (len < 0)?strerror(errno):"unexpected EOF");
298       }
299       break;
300     }
301   }
302
303   dsi->read_count += stored;
304   return stored;
305 }
306
307 /* ---------------------------------------
308  * read raw data. return actual bytes read. this will wait until 
309  * it gets length bytes 
310  */
311 static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
312 {
313   size_t len;
314   size_t buflen;
315   
316   dsi_init_buffer(dsi);
317   len = from_buf(dsi, data, length);
318   dsi->read_count += len;
319   if (len == length) {
320       return len;
321   }
322   
323   buflen = min(8192, dsi->end - dsi->eof);
324   if (buflen > 0) {
325       ssize_t ret;
326       ret = read(dsi->socket, dsi->eof, buflen);
327       if (ret > 0)
328           dsi->eof += ret;
329   }
330   return dsi_stream_read(dsi, data, length -len);
331 }
332
333 /* ---------------------------------------
334 */
335 void dsi_sleep(DSI *dsi, const int state)
336 {
337     dsi->asleep = state;
338 }
339
340 /* ---------------------------------------
341 */
342 static void block_sig(DSI *dsi)
343 {
344   dsi->in_write++;
345 }
346
347 /* ---------------------------------------
348 */
349 static void unblock_sig(DSI *dsi)
350 {
351   dsi->in_write--;
352 }
353
354 /* ---------------------------------------
355  * write data. 0 on failure. this assumes that dsi_len will never
356  * cause an overflow in the data buffer. 
357  */
358 int dsi_stream_send(DSI *dsi, void *buf, size_t length)
359 {
360   char block[DSI_BLOCKSIZ];
361 #ifdef USE_WRITEV
362   struct iovec iov[2];
363   size_t towrite;
364   ssize_t len;
365 #endif /* USE_WRITEV */
366
367   block[0] = dsi->header.dsi_flags;
368   block[1] = dsi->header.dsi_command;
369   memcpy(block + 2, &dsi->header.dsi_requestID, 
370          sizeof(dsi->header.dsi_requestID));
371   memcpy(block + 4, &dsi->header.dsi_code, sizeof(dsi->header.dsi_code));
372   memcpy(block + 8, &dsi->header.dsi_len, sizeof(dsi->header.dsi_len));
373   memcpy(block + 12, &dsi->header.dsi_reserved,
374          sizeof(dsi->header.dsi_reserved));
375
376   if (!length) { /* just write the header */
377     length = (dsi_stream_write(dsi, block, sizeof(block), 0) == sizeof(block));
378     return length; /* really 0 on failure, 1 on success */
379   }
380   
381   /* block signals */
382   block_sig(dsi);
383 #ifdef USE_WRITEV
384   iov[0].iov_base = block;
385   iov[0].iov_len = sizeof(block);
386   iov[1].iov_base = buf;
387   iov[1].iov_len = length;
388   
389   towrite = sizeof(block) + length;
390   dsi->write_count += towrite;
391   while (towrite > 0) {
392     if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || 
393         !len)
394       continue;
395     
396     if ((size_t)len == towrite) /* wrote everything out */
397       break;
398     else if (len < 0) { /* error */
399       if (errno == EAGAIN || errno == EWOULDBLOCK) {
400           if (!dsi_buffer(dsi)) {
401               continue;
402           }
403       }
404       LOG(log_error, logtype_default, "dsi_stream_send: %s", strerror(errno));
405       unblock_sig(dsi);
406       return 0;
407     }
408     
409     towrite -= len;
410     if (towrite > length) { /* skip part of header */
411       iov[0].iov_base = (char *) iov[0].iov_base + len;
412       iov[0].iov_len -= len;
413     } else { /* skip to data */
414       if (iov[0].iov_len) {
415         len -= iov[0].iov_len;
416         iov[0].iov_len = 0;
417       }
418       iov[1].iov_base = (char *) iov[1].iov_base + len;
419       iov[1].iov_len -= len;
420     }
421   }
422   
423 #else /* USE_WRITEV */
424   /* write the header then data */
425   if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
426             (dsi_stream_write(dsi, buf, length, 0) != length)) {
427       unblock_sig(dsi);
428       return 0;
429   }
430 #endif /* USE_WRITEV */
431
432   unblock_sig(dsi);
433   return 1;
434 }
435
436
437 /* ---------------------------------------
438  * read data. function on success. 0 on failure. data length gets
439  * stored in length variable. this should really use size_t's, but
440  * that would require changes elsewhere. */
441 int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
442                        size_t *rlength)
443 {
444   char block[DSI_BLOCKSIZ];
445
446   /* read in the header */
447   if (dsi_buffered_stream_read(dsi, (u_int8_t *)block, sizeof(block)) != sizeof(block)) 
448     return 0;
449
450   dsi->header.dsi_flags = block[0];
451   dsi->header.dsi_command = block[1];
452   /* FIXME, not the right place, 
453      but we get a server disconnect without reason in the log
454   */
455   if (!block[1]) {
456       LOG(log_error, logtype_default, "dsi_stream_receive: invalid packet, fatal");
457       return 0;
458   }
459
460   memcpy(&dsi->header.dsi_requestID, block + 2, 
461          sizeof(dsi->header.dsi_requestID));
462   memcpy(&dsi->header.dsi_code, block + 4, sizeof(dsi->header.dsi_code));
463   memcpy(&dsi->header.dsi_len, block + 8, sizeof(dsi->header.dsi_len));
464   memcpy(&dsi->header.dsi_reserved, block + 12,
465          sizeof(dsi->header.dsi_reserved));
466   dsi->clientID = ntohs(dsi->header.dsi_requestID);
467   
468   /* make sure we don't over-write our buffers. */
469   *rlength = min(ntohl(dsi->header.dsi_len), ilength);
470   if (dsi_stream_read(dsi, buf, *rlength) != *rlength) 
471     return 0;
472
473   return block[1];
474 }