]> arthur.barton.de Git - netatalk.git/blob - libatalk/dsi/dsi_stream.c
Fix all remaining warnings from gcc -Wall that can be fixed
[netatalk.git] / libatalk / dsi / dsi_stream.c
1 /*
2  * $Id: dsi_stream.c,v 1.16 2009-10-22 13:40:11 franklahm Exp $
3  *
4  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
5  * All rights reserved. See COPYRIGHT.
6  *
7  * this file provides the following functions:
8  * dsi_stream_write:    just write a bunch of bytes.
9  * dsi_stream_read:     just read a bunch of bytes.
10  * dsi_stream_send:     send a DSI header + data.
11  * dsi_stream_receive:  read a DSI header + data.
12  */
13
14 #ifdef HAVE_CONFIG_H
15 #include "config.h"
16 #endif /* HAVE_CONFIG_H */
17
18 #define USE_WRITEV
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 #ifdef HAVE_UNISTD_H
24 #include <unistd.h>
25 #endif
26
27 #include <string.h>
28 #include <errno.h>
29 #include <sys/types.h>
30 #include <sys/socket.h>
31
32 #ifdef USE_WRITEV
33 #include <sys/uio.h>
34 #endif
35
36 #include <atalk/logger.h>
37
38 #include <atalk/dsi.h>
39 #include <netatalk/endian.h>
40
41 #define min(a,b)  ((a) < (b) ? (a) : (b))
42
43 #ifndef MSG_MORE
44 #define MSG_MORE 0x8000
45 #endif
46
47 #ifndef MSG_DONTWAIT
48 #define MSG_DONTWAIT 0x40
49 #endif
50
51 /* ------------------------- 
52  * we don't use a circular buffer.
53 */
54 static void dsi_init_buffer(DSI *dsi)
55 {
56     if (!dsi->buffer) {
57         /* XXX config options */
58         dsi->maxsize = 6 * dsi->server_quantum;
59         if (!dsi->maxsize)
60             dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
61         dsi->buffer = malloc(dsi->maxsize);
62         if (!dsi->buffer) {
63             return;
64         }
65         dsi->start = dsi->buffer;
66         dsi->eof = dsi->buffer;
67         dsi->end = dsi->buffer + dsi->maxsize;
68     }
69 }
70
71 /* ---------------------- */
72 static void dsi_buffer(DSI *dsi)
73 {
74     fd_set readfds, writefds;
75     int    len;
76     int    maxfd;
77
78     FD_ZERO(&readfds);
79     FD_ZERO(&writefds);
80     FD_SET( dsi->socket, &readfds);
81     FD_SET( dsi->socket, &writefds);
82     maxfd = dsi->socket +1;
83     while (1) {
84         FD_SET( dsi->socket, &readfds);
85         FD_SET( dsi->socket, &writefds);
86         if (select( maxfd, &readfds, &writefds, NULL, NULL) <= 0)
87             return;
88
89         if ( !FD_ISSET(dsi->socket, &readfds)) {
90             /* nothing waiting in the read queue */
91             return;
92         }
93         dsi_init_buffer(dsi);
94         len = dsi->end - dsi->eof;
95
96         if (len <= 0) {
97             /* ouch, our buffer is full ! 
98              * fall back to blocking IO 
99              * could block and disconnect but it's better than a cpu hog
100              */
101             dsi_block(dsi, 0);
102             return;
103         }
104
105         len = read(dsi->socket, dsi->eof, len);
106         if (len <= 0)
107             return;
108         dsi->eof += len;
109         if ( FD_ISSET(dsi->socket, &writefds)) {
110             return;
111         }
112     }
113 }
114
115 /* ------------------------------
116  * write raw data. return actual bytes read. checks against EINTR
117  * aren't necessary if all of the signals have SA_RESTART
118  * specified. */
119 size_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode _U_)
120 {
121   size_t written;
122   ssize_t len;
123 #if 0
124   /* FIXME sometime it's slower */
125   unsigned int flags = (mode)?MSG_MORE:0;
126 #endif
127   unsigned int flags = 0;
128
129 #if 0
130   /* XXX there's no MSG_DONTWAIT in recv ?? so we have to play with ioctl
131   */ 
132   if (dsi->noblocking) {
133       flags |= MSG_DONTWAIT;
134   }
135 #endif
136   
137   dsi->in_write++;
138   written = 0;
139   while (written < length) {
140     if ((-1 == (len = send(dsi->socket, (u_int8_t *) data + written,
141                       length - written, flags)) && errno == EINTR) ||
142         !len)
143       continue;
144
145     if (len < 0) {
146       if (dsi->noblocking && errno ==  EAGAIN) {
147          /* non blocking mode but will block 
148           * read data in input queue.
149           * 
150          */
151          dsi_buffer(dsi);
152       }
153       else {
154           LOG(log_error, logtype_default, "dsi_stream_write: %s", strerror(errno));
155           break;
156       }
157     }
158     else {
159         written += len;
160     }
161   }
162
163   dsi->write_count += written;
164   dsi->in_write--;
165   return written;
166 }
167
168 /* ---------------------------------
169 */
170 static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
171 {
172     size_t nbe = 0;
173     
174     if (dsi->start) {        
175         nbe = dsi->eof - dsi->start;
176
177         if (nbe > 0) {
178            nbe = min((size_t)nbe, count);
179            memcpy(buf, dsi->start, nbe);
180            dsi->start += nbe;
181
182            if (dsi->eof == dsi->start) 
183                dsi->start = dsi->eof = dsi->buffer;
184
185         }
186     }
187     return nbe;
188 }
189
190 static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
191 {
192     ssize_t nbe;
193     
194     if (!count)
195         return 0;
196
197     nbe = from_buf(dsi, buf, count);
198     if (nbe)
199         return nbe;
200   
201     return read(dsi->socket, buf, count);
202
203 }
204
205 /* ---------------------------------------
206  * read raw data. return actual bytes read. this will wait until 
207  * it gets length bytes 
208  */
209 size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
210 {
211   size_t stored;
212   ssize_t len;
213   
214   stored = 0;
215   while (stored < length) {
216     len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
217     if (len == -1 && errno == EINTR)
218       continue;
219     else if (len > 0)
220       stored += len;
221     else { /* eof or error */
222       /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
223       if (len || stored || dsi->read_count) {
224           LOG(log_error, logtype_default, "dsi_stream_read(%d): %s", len, (len < 0)?strerror(errno):"unexpected EOF");
225       }
226       break;
227     }
228   }
229
230   dsi->read_count += stored;
231   return stored;
232 }
233
234 /* ---------------------------------------
235  * read raw data. return actual bytes read. this will wait until 
236  * it gets length bytes 
237  */
238 static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
239 {
240   size_t len;
241   size_t buflen;
242   
243   dsi_init_buffer(dsi);
244   len = from_buf(dsi, data, length);
245   dsi->read_count += len;
246   if (len == length) {
247       return len;
248   }
249   
250   buflen = min(8192, dsi->end - dsi->eof);
251   if (buflen > 0) {
252       ssize_t ret;
253       ret = read(dsi->socket, dsi->eof, buflen);
254       if (ret > 0)
255           dsi->eof += ret;
256   }
257   return dsi_stream_read(dsi, data, length -len);
258 }
259
260 /* ---------------------------------------
261 */
262 void dsi_sleep(DSI *dsi, const int state)
263 {
264     dsi->asleep = state;
265 }
266
267 /* ---------------------------------------
268 */
269 static void block_sig(DSI *dsi)
270 {
271   dsi->in_write++;
272   if (!dsi->sigblocked) sigprocmask(SIG_BLOCK, &dsi->sigblockset, &dsi->oldset);
273 }
274
275 /* ---------------------------------------
276 */
277 static void unblock_sig(DSI *dsi)
278 {
279   dsi->in_write--;
280   if (!dsi->sigblocked) sigprocmask(SIG_SETMASK, &dsi->oldset, NULL);
281 }
282
283 /* ---------------------------------------
284  * write data. 0 on failure. this assumes that dsi_len will never
285  * cause an overflow in the data buffer. 
286  */
287 int dsi_stream_send(DSI *dsi, void *buf, size_t length)
288 {
289   char block[DSI_BLOCKSIZ];
290 #ifdef USE_WRITEV
291   struct iovec iov[2];
292   size_t towrite;
293   ssize_t len;
294 #endif /* USE_WRITEV */
295
296   block[0] = dsi->header.dsi_flags;
297   block[1] = dsi->header.dsi_command;
298   memcpy(block + 2, &dsi->header.dsi_requestID, 
299          sizeof(dsi->header.dsi_requestID));
300   memcpy(block + 4, &dsi->header.dsi_code, sizeof(dsi->header.dsi_code));
301   memcpy(block + 8, &dsi->header.dsi_len, sizeof(dsi->header.dsi_len));
302   memcpy(block + 12, &dsi->header.dsi_reserved,
303          sizeof(dsi->header.dsi_reserved));
304
305   if (!length) { /* just write the header */
306     length = (dsi_stream_write(dsi, block, sizeof(block), 0) == sizeof(block));
307     return length; /* really 0 on failure, 1 on success */
308   }
309   
310   /* block signals */
311   block_sig(dsi);
312 #ifdef USE_WRITEV
313   iov[0].iov_base = block;
314   iov[0].iov_len = sizeof(block);
315   iov[1].iov_base = buf;
316   iov[1].iov_len = length;
317   
318   towrite = sizeof(block) + length;
319   dsi->write_count += towrite;
320   while (towrite > 0) {
321     if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || 
322         !len)
323       continue;
324     
325     if ((size_t)len == towrite) /* wrote everything out */
326       break;
327     else if (len < 0) { /* error */
328       LOG(log_error, logtype_default, "dsi_stream_send: %s", strerror(errno));
329       unblock_sig(dsi);
330       return 0;
331     }
332     
333     towrite -= len;
334     if (towrite > length) { /* skip part of header */
335       iov[0].iov_base = (char *) iov[0].iov_base + len;
336       iov[0].iov_len -= len;
337     } else { /* skip to data */
338       if (iov[0].iov_len) {
339         len -= iov[0].iov_len;
340         iov[0].iov_len = 0;
341       }
342       iov[1].iov_base = (char *) iov[1].iov_base + len;
343       iov[1].iov_len -= len;
344     }
345   }
346   
347 #else /* USE_WRITEV */
348   /* write the header then data */
349   if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
350             (dsi_stream_write(dsi, buf, length, 0) != length)) {
351       unblock_sig(dsi);
352       return 0;
353   }
354 #endif /* USE_WRITEV */
355
356   unblock_sig(dsi);
357   return 1;
358 }
359
360
361 /* ---------------------------------------
362  * read data. function on success. 0 on failure. data length gets
363  * stored in length variable. this should really use size_t's, but
364  * that would require changes elsewhere. */
365 int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
366                        size_t *rlength)
367 {
368   char block[DSI_BLOCKSIZ];
369
370   /* read in the header */
371   if (dsi_buffered_stream_read(dsi, (u_int8_t *)block, sizeof(block)) != sizeof(block)) 
372     return 0;
373
374   dsi->header.dsi_flags = block[0];
375   dsi->header.dsi_command = block[1];
376   /* FIXME, not the right place, 
377      but we get a server disconnect without reason in the log
378   */
379   if (!block[1]) {
380       LOG(log_error, logtype_default, "dsi_stream_receive: invalid packet, fatal");
381       return 0;
382   }
383
384   memcpy(&dsi->header.dsi_requestID, block + 2, 
385          sizeof(dsi->header.dsi_requestID));
386   memcpy(&dsi->header.dsi_code, block + 4, sizeof(dsi->header.dsi_code));
387   memcpy(&dsi->header.dsi_len, block + 8, sizeof(dsi->header.dsi_len));
388   memcpy(&dsi->header.dsi_reserved, block + 12,
389          sizeof(dsi->header.dsi_reserved));
390   dsi->clientID = ntohs(dsi->header.dsi_requestID);
391   
392   /* make sure we don't over-write our buffers. */
393   *rlength = min(ntohl(dsi->header.dsi_len), ilength);
394   if (dsi_stream_read(dsi, buf, *rlength) != *rlength) 
395     return 0;
396
397   return block[1];
398 }