]> arthur.barton.de Git - netatalk.git/blob - libatalk/dsi/dsi_stream.c
Remove all Appletalk stuff
[netatalk.git] / libatalk / dsi / dsi_stream.c
1 /*
2  * $Id: dsi_stream.c,v 1.20 2009-10-26 12:35:56 franklahm Exp $
3  *
4  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
5  * All rights reserved. See COPYRIGHT.
6  *
7  * this file provides the following functions:
8  * dsi_stream_write:    just write a bunch of bytes.
9  * dsi_stream_read:     just read a bunch of bytes.
10  * dsi_stream_send:     send a DSI header + data.
11  * dsi_stream_receive:  read a DSI header + data.
12  */
13
14 #ifdef HAVE_CONFIG_H
15 #include "config.h"
16 #endif /* HAVE_CONFIG_H */
17
18 #define USE_WRITEV
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include <string.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31
32 #ifdef USE_WRITEV
33 #include <sys/uio.h>
34 #endif
35
36 #include <atalk/logger.h>
37 #include <atalk/dsi.h>
38 #include <atalk/util.h>
39
40 #define min(a,b)  ((a) < (b) ? (a) : (b))
41
42 #ifndef MSG_MORE
43 #define MSG_MORE 0x8000
44 #endif
45
46 #ifndef MSG_DONTWAIT
47 #define MSG_DONTWAIT 0x40
48 #endif
49
50 /* ------------------------- 
51  * we don't use a circular buffer.
52 */
53 static void dsi_init_buffer(DSI *dsi)
54 {
55     if (!dsi->buffer) {
56         /* XXX config options */
57         dsi->maxsize = 6 * dsi->server_quantum;
58         if (!dsi->maxsize)
59             dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
60         dsi->buffer = malloc(dsi->maxsize);
61         if (!dsi->buffer) {
62             return;
63         }
64         dsi->start = dsi->buffer;
65         dsi->eof = dsi->buffer;
66         dsi->end = dsi->buffer + dsi->maxsize;
67     }
68 }
69
70 /* ---------------------- 
71    afpd is sleeping too much while trying to send something.
72    May be there's no reader or the reader is also sleeping in write,
73    look if there's some data for us to read, hopefully it will wake up
74    the reader so we can write again.
75 */
76 static int dsi_peek(DSI *dsi)
77 {
78     fd_set readfds, writefds;
79     int    len;
80     int    maxfd;
81     int    ret;
82
83     FD_ZERO(&readfds);
84     FD_ZERO(&writefds);
85     FD_SET( dsi->socket, &readfds);
86     FD_SET( dsi->socket, &writefds);
87     maxfd = dsi->socket +1;
88
89     while (1) {
90         FD_SET( dsi->socket, &readfds);
91         FD_SET( dsi->socket, &writefds);
92
93         /* No timeout: if there's nothing to read nor nothing to write,
94          * we've got nothing to do at all */
95         if ((ret = select( maxfd, &readfds, &writefds, NULL, NULL)) <= 0) {
96             if (ret == -1 && errno == EINTR)
97                 /* we might have been interrupted by out timer, so restart select */
98                 continue;
99             /* give up */
100             break;
101         }
102
103         /* Check if there's sth to read, hopefully reading that will unblock the client */
104         if (FD_ISSET(dsi->socket, &readfds)) {
105             dsi_init_buffer(dsi);
106             len = dsi->end - dsi->eof;
107
108             if (len <= 0) {
109                 /* ouch, our buffer is full ! fall back to blocking IO 
110                  * could block and disconnect but it's better than a cpu hog */
111                 break;
112             }
113
114             len = read(dsi->socket, dsi->eof, len);
115             if (len <= 0)
116                 break;
117             dsi->eof += len;
118         }
119
120         if (FD_ISSET(dsi->socket, &writefds))
121             /* we can write again at last */
122             break;
123     }
124
125     return 0;
126 }
127
128 /* ------------------------------
129  * write raw data. return actual bytes read. checks against EINTR
130  * aren't necessary if all of the signals have SA_RESTART
131  * specified. */
132 ssize_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode)
133 {
134   size_t written;
135   ssize_t len;
136   unsigned int flags = 0;
137
138   dsi->in_write++;
139   written = 0;
140
141   LOG(log_maxdebug, logtype_dsi, "dsi_stream_write: sending %u bytes", length);
142
143   /* non blocking mode */
144   if (setnonblock(dsi->socket, 1) < 0) {
145       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
146       return -1;
147   }
148
149   while (written < length) {
150       len = send(dsi->socket, (u_int8_t *) data + written, length - written, flags);
151       if (len >= 0) {
152           written += len;
153           continue;
154       }
155
156       if (errno == EINTR)
157           continue;
158
159       if (errno == EAGAIN || errno == EWOULDBLOCK) {
160           if (mode == DSI_NOWAIT && written == 0) {
161               /* DSI_NOWAIT is used by attention give up in this case. */
162               written = -1;
163               goto exit;
164           }
165
166           /* Try to read sth. in order to break up possible deadlock */
167           if (dsi_peek(dsi) != 0) {
168               written = -1;
169               goto exit;
170           }
171           /* Now try writing again */
172           continue;
173       }
174
175       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
176       written = -1;
177       goto exit;
178   }
179
180   dsi->write_count += written;
181
182 exit:
183   if (setnonblock(dsi->socket, 0) < 0) {
184       LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
185       written = -1;
186   }
187
188   dsi->in_write--;
189   return written;
190 }
191
192
193 /* ---------------------------------
194 */
195 #ifdef WITH_SENDFILE
196 ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t length)
197 {
198   size_t written;
199   ssize_t len;
200
201   dsi->in_write++;
202   written = 0;
203
204   /* non blocking mode */
205   if (setnonblock(dsi->socket, 1) < 0) {
206       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
207       return -1;
208   }
209
210   while (written < length) {
211     len = sys_sendfile(dsi->socket, fromfd, &offset, length - written);
212         
213     if (len < 0) {
214       if (errno == EINTR)
215           continue;
216       if (errno == EINVAL || errno == ENOSYS)
217           return -1;
218           
219       if (errno == EAGAIN || errno == EWOULDBLOCK) {
220           if (dsi_peek(dsi)) {
221               /* can't go back to blocking mode, exit, the next read
222                  will return with an error and afpd will die.
223               */
224               break;
225           }
226           continue;
227       }
228       LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
229       break;
230     }
231     else if (!len) {
232         /* afpd is going to exit */
233         errno = EIO;
234         return -1; /* I think we're at EOF here... */
235     }
236     else 
237         written += len;
238   }
239
240   if (setnonblock(dsi->socket, 0) < 0) {
241       LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
242       return -1;
243   }
244
245   dsi->write_count += written;
246   dsi->in_write--;
247   return written;
248 }
249 #endif
250
251 /* 
252  * Return all bytes up to count from dsi->buffer if there are any buffered there
253  */
254 static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
255 {
256     size_t nbe = 0;
257     
258     if (dsi->start) {        
259         nbe = dsi->eof - dsi->start;
260
261         if (nbe > 0) {
262            nbe = min((size_t)nbe, count);
263            memcpy(buf, dsi->start, nbe);
264            dsi->start += nbe;
265
266            if (dsi->eof == dsi->start) 
267                dsi->start = dsi->eof = dsi->buffer;
268
269         }
270     }
271     return nbe;
272 }
273
274 /*
275  * Get bytes from buffer dsi->buffer or read from socket
276  *
277  * 1. Check if there are bytes in the the dsi->buffer buffer.
278  * 2. Return bytes from (1) if yes.
279  *    Note: this may return fewer bytes then requested in count !!
280  * 3. If the buffer was empty, read from the socket.
281  */
282 static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
283 {
284     ssize_t nbe;
285     
286     if (!count)
287         return 0;
288
289     nbe = from_buf(dsi, buf, count); /* 1. */
290     if (nbe)
291         return nbe;             /* 2. */
292   
293     return read(dsi->socket, buf, count); /* 3. */
294 }
295
296 /*
297  * Essentially a loop around buf_read() to ensure "length" bytes are read
298  * from dsi->buffer and/or the socket.
299  */
300 size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
301 {
302   size_t stored;
303   ssize_t len;
304
305   stored = 0;
306   while (stored < length) {
307     len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
308     if (len == -1 && errno == EINTR) {
309       continue;
310     } else if (len > 0) {
311       stored += len;
312     } else { /* eof or error */
313       /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
314       if (len || stored || dsi->read_count) {
315           if (! (dsi->flags & DSI_DISCONNECTED))
316               LOG(log_error, logtype_dsi, "dsi_stream_read(fd: %i): len:%d, %s",
317                   dsi->socket, len, (len < 0) ? strerror(errno) : "unexpected EOF");
318       }
319       break;
320     }
321   }
322
323   dsi->read_count += stored;
324   return stored;
325 }
326
327 /*
328  * Get "length" bytes from buffer and/or socket. In order to avoid frequent small reads
329  * this tries to read larger chunks (8192 bytes) into a buffer.
330  */
331 static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
332 {
333   size_t len;
334   size_t buflen;
335   
336   dsi_init_buffer(dsi);
337   len = from_buf(dsi, data, length); /* read from buffer dsi->buffer */
338   dsi->read_count += len;
339   if (len == length) {          /* got enough bytes from there ? */
340       return len;               /* yes */
341   }
342
343   /* fill the buffer with 8192 bytes or until buffer is full */
344   buflen = min(8192, dsi->end - dsi->eof);
345   if (buflen > 0) {
346       ssize_t ret;
347       ret = read(dsi->socket, dsi->eof, buflen);
348       if (ret > 0)
349           dsi->eof += ret;
350   }
351
352   /* now get the remaining data */
353   len += dsi_stream_read(dsi, data + len, length - len);
354   return len;
355 }
356
357 /* ---------------------------------------
358 */
359 void dsi_sleep(DSI *dsi, const int state)
360 {
361     if (state)
362         dsi->flags |= DSI_SLEEPING;
363     else
364         dsi->flags &= ~DSI_SLEEPING;
365 }
366
367 /* ---------------------------------------
368 */
369 static void block_sig(DSI *dsi)
370 {
371   dsi->in_write++;
372 }
373
374 /* ---------------------------------------
375 */
376 static void unblock_sig(DSI *dsi)
377 {
378   dsi->in_write--;
379 }
380
381 /* ---------------------------------------
382  * write data. 0 on failure. this assumes that dsi_len will never
383  * cause an overflow in the data buffer. 
384  */
385 int dsi_stream_send(DSI *dsi, void *buf, size_t length)
386 {
387   char block[DSI_BLOCKSIZ];
388 #ifdef USE_WRITEV
389   struct iovec iov[2];
390   size_t towrite;
391   ssize_t len;
392 #endif /* USE_WRITEV */
393
394   block[0] = dsi->header.dsi_flags;
395   block[1] = dsi->header.dsi_command;
396   memcpy(block + 2, &dsi->header.dsi_requestID, 
397          sizeof(dsi->header.dsi_requestID));
398   memcpy(block + 4, &dsi->header.dsi_code, sizeof(dsi->header.dsi_code));
399   memcpy(block + 8, &dsi->header.dsi_len, sizeof(dsi->header.dsi_len));
400   memcpy(block + 12, &dsi->header.dsi_reserved,
401          sizeof(dsi->header.dsi_reserved));
402
403   if (!length) { /* just write the header */
404     length = (dsi_stream_write(dsi, block, sizeof(block), 0) == sizeof(block));
405     return length; /* really 0 on failure, 1 on success */
406   }
407   
408   /* block signals */
409   block_sig(dsi);
410 #ifdef USE_WRITEV
411   iov[0].iov_base = block;
412   iov[0].iov_len = sizeof(block);
413   iov[1].iov_base = buf;
414   iov[1].iov_len = length;
415   
416   towrite = sizeof(block) + length;
417   dsi->write_count += towrite;
418   while (towrite > 0) {
419     if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || 
420         !len)
421       continue;
422     
423     if ((size_t)len == towrite) /* wrote everything out */
424       break;
425     else if (len < 0) { /* error */
426       if (errno == EAGAIN || errno == EWOULDBLOCK) {
427           if (!dsi_peek(dsi)) {
428               continue;
429           }
430       }
431       LOG(log_error, logtype_dsi, "dsi_stream_send: %s", strerror(errno));
432       unblock_sig(dsi);
433       return 0;
434     }
435     
436     towrite -= len;
437     if (towrite > length) { /* skip part of header */
438       iov[0].iov_base = (char *) iov[0].iov_base + len;
439       iov[0].iov_len -= len;
440     } else { /* skip to data */
441       if (iov[0].iov_len) {
442         len -= iov[0].iov_len;
443         iov[0].iov_len = 0;
444       }
445       iov[1].iov_base = (char *) iov[1].iov_base + len;
446       iov[1].iov_len -= len;
447     }
448   }
449   
450 #else /* USE_WRITEV */
451   /* write the header then data */
452   if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
453             (dsi_stream_write(dsi, buf, length, 0) != length)) {
454       unblock_sig(dsi);
455       return 0;
456   }
457 #endif /* USE_WRITEV */
458
459   unblock_sig(dsi);
460   return 1;
461 }
462
463
464 /* ---------------------------------------
465  * read data. function on success. 0 on failure. data length gets
466  * stored in length variable. this should really use size_t's, but
467  * that would require changes elsewhere. */
468 int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
469                        size_t *rlength)
470 {
471   char block[DSI_BLOCKSIZ];
472
473   /* read in the header */
474   if (dsi_buffered_stream_read(dsi, (u_int8_t *)block, sizeof(block)) != sizeof(block)) 
475     return 0;
476
477   dsi->header.dsi_flags = block[0];
478   dsi->header.dsi_command = block[1];
479   /* FIXME, not the right place, 
480      but we get a server disconnect without reason in the log
481   */
482   if (!block[1]) {
483       LOG(log_error, logtype_dsi, "dsi_stream_receive: invalid packet, fatal");
484       return 0;
485   }
486
487   memcpy(&dsi->header.dsi_requestID, block + 2, 
488          sizeof(dsi->header.dsi_requestID));
489   memcpy(&dsi->header.dsi_code, block + 4, sizeof(dsi->header.dsi_code));
490   memcpy(&dsi->header.dsi_len, block + 8, sizeof(dsi->header.dsi_len));
491   memcpy(&dsi->header.dsi_reserved, block + 12,
492          sizeof(dsi->header.dsi_reserved));
493   dsi->clientID = ntohs(dsi->header.dsi_requestID);
494   
495   /* make sure we don't over-write our buffers. */
496   *rlength = min(ntohl(dsi->header.dsi_len), ilength);
497   if (dsi_stream_read(dsi, buf, *rlength) != *rlength) 
498     return 0;
499
500   return block[1];
501 }