]> arthur.barton.de Git - netatalk.git/blobdiff - libatalk/dsi/dsi_stream.c
Merge branch 'master' of ssh://netatalk.git.sourceforge.net/gitroot/netatalk/netatalk
[netatalk.git] / libatalk / dsi / dsi_stream.c
index 73e8ba41949aabf411dfcb69f7af4f4368a58f3d..119d28ea4c95df415e5472d7da17a3c8e8c5dc91 100644 (file)
@@ -1,6 +1,4 @@
 /*
- * $Id: dsi_stream.c,v 1.20 2009-10-26 12:35:56 franklahm Exp $
- *
  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
  * All rights reserved. See COPYRIGHT.
  *
@@ -15,8 +13,6 @@
 #include "config.h"
 #endif /* HAVE_CONFIG_H */
 
-#define USE_WRITEV
-
 #include <stdio.h>
 #include <stdlib.h>
 
 #include <errno.h>
 #include <sys/types.h>
 #include <sys/socket.h>
-
-#ifdef USE_WRITEV
 #include <sys/uio.h>
-#endif
 
 #include <atalk/logger.h>
 #include <atalk/dsi.h>
 #define MSG_DONTWAIT 0x40
 #endif
 
-/* ------------------------- 
- * we don't use a circular buffer.
-*/
-static void dsi_init_buffer(DSI *dsi)
-{
-    if (!dsi->buffer) {
-        /* XXX config options */
-        dsi->maxsize = 6 * dsi->server_quantum;
-        if (!dsi->maxsize)
-            dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
-        dsi->buffer = malloc(dsi->maxsize);
-        if (!dsi->buffer) {
-            return;
-        }
-        dsi->start = dsi->buffer;
-        dsi->eof = dsi->buffer;
-        dsi->end = dsi->buffer + dsi->maxsize;
-    }
-}
-
 /* ---------------------- 
    afpd is sleeping too much while trying to send something.
    May be there's no reader or the reader is also sleeping in write,
    look if there's some data for us to read, hopefully it will wake up
-   the reader
+   the reader so we can write again.
 */
-static int dsi_buffer(DSI *dsi)
+static int dsi_peek(DSI *dsi)
 {
     fd_set readfds, writefds;
     int    len;
     int    maxfd;
+    int    ret;
+
+    LOG(log_debug, logtype_dsi, "dsi_peek");
 
-    /* non blocking mode */
-    if (setnonblock(dsi->socket, 1) < 0) {
-        /* can't do it! exit without error it will sleep to death below */
-        LOG(log_error, logtype_default, "dsi_buffer: ioctl non blocking mode %s", strerror(errno));
-        return 0;
-    }
-    
     FD_ZERO(&readfds);
     FD_ZERO(&writefds);
     FD_SET( dsi->socket, &readfds);
     FD_SET( dsi->socket, &writefds);
     maxfd = dsi->socket +1;
+
     while (1) {
         FD_SET( dsi->socket, &readfds);
         FD_SET( dsi->socket, &writefds);
-        if (select( maxfd, &readfds, &writefds, NULL, NULL) <= 0)
-            break;
 
-        if ( !FD_ISSET(dsi->socket, &readfds)) {
-            /* nothing waiting in the read queue */
-            break;
+        /* No timeout: if there's nothing to read nor nothing to write,
+         * we've got nothing to do at all */
+        if ((ret = select( maxfd, &readfds, &writefds, NULL, NULL)) <= 0) {
+            if (ret == -1 && errno == EINTR)
+                /* we might have been interrupted by out timer, so restart select */
+                continue;
+            /* give up */
+            LOG(log_error, logtype_dsi, "dsi_peek: unexpected select return: %d %s",
+                ret, ret < 0 ? strerror(errno) : "");
+            return -1;
         }
-        dsi_init_buffer(dsi);
-        len = dsi->end - dsi->eof;
-
-        if (len <= 0) {
-            /* ouch, our buffer is full ! 
-             * fall back to blocking IO 
-             * could block and disconnect but it's better than a cpu hog
-             */
-            break;
+
+        /* Check if there's sth to read, hopefully reading that will unblock the client */
+        if (FD_ISSET(dsi->socket, &readfds)) {
+            len = dsi->end - dsi->eof;
+            LOG(log_note, logtype_dsi, "dsi_peek: used read buffer space: %d bytes", dsi->eof - dsi->buffer);
+
+            if (len <= 0) {
+                /* ouch, our buffer is full ! fall back to blocking IO 
+                 * could block and disconnect but it's better than a cpu hog */
+                LOG(log_warning, logtype_dsi, "dsi_peek: read buffer is full");
+                break;
+            }
+
+            if ((len = read(dsi->socket, dsi->eof, len)) <= 0) {
+                if (len == 0) {
+                    LOG(log_error, logtype_dsi, "dsi_peek: EOF");
+                    return -1;
+                }
+                LOG(log_error, logtype_dsi, "dsi_peek: read: %s", strerror(errno));
+                if (errno == EAGAIN)
+                    continue;
+                return -1;
+            }
+            LOG(log_debug, logtype_dsi, "dsi_peek: read %d bytes", len);
+
+            dsi->eof += len;
         }
 
-        len = read(dsi->socket, dsi->eof, len);
-        if (len <= 0)
-            break;
-        dsi->eof += len;
-        if ( FD_ISSET(dsi->socket, &writefds)) {
-            /* we can write again at last */
+        if (FD_ISSET(dsi->socket, &writefds)) {
+            /* we can write again */
+            LOG(log_debug, logtype_dsi, "dsi_peek: can write again");
             break;
         }
     }
-    if (setnonblock(dsi->socket, 0) < 0) {
-        /* can't do it! afpd will fail very quickly */
-        LOG(log_error, logtype_default, "dsi_buffer: ioctl blocking mode %s", strerror(errno));
-        return -1;
-    }
+
     return 0;
 }
 
@@ -138,51 +123,49 @@ ssize_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode)
 {
   size_t written;
   ssize_t len;
-#if 0
-  /* FIXME sometime it's slower */
-  unsigned int flags = (mode)?MSG_MORE:0;
-#endif
   unsigned int flags = 0;
 
-#if 0
-  /* XXX there's no MSG_DONTWAIT in recv ?? so we have to play with ioctl
-  */ 
-  flags |= MSG_DONTWAIT;
-#endif
-  
   dsi->in_write++;
   written = 0;
+
+  LOG(log_maxdebug, logtype_dsi, "dsi_stream_write: sending %u bytes", length);
+
   while (written < length) {
-    if ((-1 == (len = send(dsi->socket, (u_int8_t *) data + written,
-                     length - written, flags)) && errno == EINTR) ||
-       !len)
-      continue;
+      len = send(dsi->socket, (u_int8_t *) data + written, length - written, flags);
+      if (len >= 0) {
+          written += len;
+          continue;
+      }
+
+      if (errno == EINTR)
+          continue;
 
-    if (len < 0) {
       if (errno == EAGAIN || errno == EWOULDBLOCK) {
+          LOG(log_debug, logtype_dsi, "dsi_stream_write: send: %s", strerror(errno));
+
           if (mode == DSI_NOWAIT && written == 0) {
-              /* DSI_NOWAIT is used by attention
-                 give up in this case.
-              */
-              return -1;
+              /* DSI_NOWAIT is used by attention give up in this case. */
+              written = -1;
+              goto exit;
           }
-          if (dsi_buffer(dsi)) {
-              /* can't go back to blocking mode, exit, the next read
-                 will return with an error and afpd will die.
-              */
-              break;
+
+          /* Try to read sth. in order to break up possible deadlock */
+          if (dsi_peek(dsi) != 0) {
+              written = -1;
+              goto exit;
           }
+          /* Now try writing again */
           continue;
       }
-      LOG(log_error, logtype_default, "dsi_stream_write: %s", strerror(errno));
-      break;
-    }
-    else {
-        written += len;
-    }
+
+      LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
+      written = -1;
+      goto exit;
   }
 
   dsi->write_count += written;
+
+exit:
   dsi->in_write--;
   return written;
 }
@@ -196,6 +179,8 @@ ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t le
   size_t written;
   ssize_t len;
 
+  LOG(log_maxdebug, logtype_dsi, "dsi_stream_read_file: sending %u bytes", length);
+
   dsi->in_write++;
   written = 0;
 
@@ -209,7 +194,7 @@ ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t le
           return -1;
           
       if (errno == EAGAIN || errno == EWOULDBLOCK) {
-          if (dsi_buffer(dsi)) {
+          if (dsi_peek(dsi)) {
               /* can't go back to blocking mode, exit, the next read
                  will return with an error and afpd will die.
               */
@@ -217,7 +202,7 @@ ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t le
           }
           continue;
       }
-      LOG(log_error, logtype_default, "dsi_stream_write: %s", strerror(errno));
+      LOG(log_error, logtype_dsi, "dsi_stream_read_file: %s", strerror(errno));
       break;
     }
     else if (!len) {
@@ -241,6 +226,8 @@ ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t le
 static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
 {
     size_t nbe = 0;
+
+    LOG(log_maxdebug, logtype_dsi, "from_buf: %u bytes", count);
     
     if (dsi->start) {        
         nbe = dsi->eof - dsi->start;
@@ -255,6 +242,9 @@ static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
 
         }
     }
+
+    LOG(log_maxdebug, logtype_dsi, "from_buf(dead: %u, unread:%u , space left: %u): returning %u",
+        dsi->start - dsi->buffer, dsi->eof - dsi->start, dsi->end - dsi->eof, nbe);
     return nbe;
 }
 
@@ -268,16 +258,22 @@ static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
  */
 static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
 {
-    ssize_t nbe;
-    
+    ssize_t len;
+
+    LOG(log_maxdebug, logtype_dsi, "buf_read(%u bytes)", count);
+
     if (!count)
         return 0;
 
-    nbe = from_buf(dsi, buf, count); /* 1. */
-    if (nbe)
-        return nbe;             /* 2. */
+    len = from_buf(dsi, buf, count); /* 1. */
+    if (len)
+        return len;             /* 2. */
   
-    return read(dsi->socket, buf, count); /* 3. */
+    len = readt(dsi->socket, buf, count, 0, 1); /* 3. */
+
+    LOG(log_maxdebug, logtype_dsi, "buf_read(%u bytes): got: %d", count, len);
+
+    return len;
 }
 
 /*
@@ -288,45 +284,55 @@ size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
 {
   size_t stored;
   ssize_t len;
-  
+
+  LOG(log_maxdebug, logtype_dsi, "dsi_stream_read(%u bytes)", length);
+
   stored = 0;
   while (stored < length) {
-    len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
-    if (len == -1 && errno == EINTR)
-      continue;
-    else if (len > 0)
-      stored += len;
-    else { /* eof or error */
-      /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
-      if (len || stored || dsi->read_count) {
-          LOG(log_error, logtype_default, "dsi_stream_read(%d): %s", len, (len < 0)?strerror(errno):"unexpected EOF");
+      len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
+      if (len == -1 && (errno == EINTR || errno == EAGAIN)) {
+          LOG(log_debug, logtype_dsi, "dsi_stream_read: select read loop");
+          continue;
+      } else if (len > 0) {
+          stored += len;
+      } else { /* eof or error */
+          /* don't log EOF error if it's just after connect (OSX 10.3 probe) */
+          if (len || stored || dsi->read_count) {
+              if (! (dsi->flags & DSI_DISCONNECTED)) {
+                  LOG(log_error, logtype_dsi, "dsi_stream_read: len:%d, %s",
+                      len, (len < 0) ? strerror(errno) : "unexpected EOF");
+                  AFP_PANIC("FIXME");
+              }
+          }
+          break;
       }
-      break;
-    }
   }
 
   dsi->read_count += stored;
+
+  LOG(log_maxdebug, logtype_dsi, "dsi_stream_read(%u bytes): got: %u", length, stored);
   return stored;
 }
 
 /*
  * Get "length" bytes from buffer and/or socket. In order to avoid frequent small reads
- * this tries to read larger chunks (8192 bytes) into a buffer.
+ * this tries to read larger chunks (65536 bytes) into a buffer.
  */
 static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
 {
   size_t len;
   size_t buflen;
+
+  LOG(log_maxdebug, logtype_dsi, "dsi_buffered_stream_read: %u bytes", length);
   
-  dsi_init_buffer(dsi);
   len = from_buf(dsi, data, length); /* read from buffer dsi->buffer */
   dsi->read_count += len;
   if (len == length) {          /* got enough bytes from there ? */
       return len;               /* yes */
   }
 
-  /* fill the buffer with 8192 bytes or until buffer is full */
-  buflen = min(8192, dsi->end - dsi->eof);
+  /* fill the buffer with 65536 bytes or until buffer is full */
+  buflen = min(65536, dsi->end - dsi->eof);
   if (buflen > 0) {
       ssize_t ret;
       ret = read(dsi->socket, dsi->eof, buflen);
@@ -339,13 +345,6 @@ static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t le
   return len;
 }
 
-/* ---------------------------------------
-*/
-void dsi_sleep(DSI *dsi, const int state)
-{
-    dsi->asleep = state;
-}
-
 /* ---------------------------------------
 */
 static void block_sig(DSI *dsi)
@@ -367,11 +366,12 @@ static void unblock_sig(DSI *dsi)
 int dsi_stream_send(DSI *dsi, void *buf, size_t length)
 {
   char block[DSI_BLOCKSIZ];
-#ifdef USE_WRITEV
   struct iovec iov[2];
   size_t towrite;
   ssize_t len;
-#endif /* USE_WRITEV */
+
+  LOG(log_maxdebug, logtype_dsi, "dsi_stream_send: %u bytes",
+      length ? length : sizeof(block));
 
   block[0] = dsi->header.dsi_flags;
   block[1] = dsi->header.dsi_command;
@@ -389,7 +389,6 @@ int dsi_stream_send(DSI *dsi, void *buf, size_t length)
   
   /* block signals */
   block_sig(dsi);
-#ifdef USE_WRITEV
   iov[0].iov_base = block;
   iov[0].iov_len = sizeof(block);
   iov[1].iov_base = buf;
@@ -398,46 +397,36 @@ int dsi_stream_send(DSI *dsi, void *buf, size_t length)
   towrite = sizeof(block) + length;
   dsi->write_count += towrite;
   while (towrite > 0) {
-    if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || 
-       !len)
-      continue;
+      if (((len = writev(dsi->socket, iov, 2)) == -1 && errno == EINTR) || (len == 0))
+          continue;
     
-    if ((size_t)len == towrite) /* wrote everything out */
-      break;
-    else if (len < 0) { /* error */
-      if (errno == EAGAIN || errno == EWOULDBLOCK) {
-          if (!dsi_buffer(dsi)) {
-              continue;
+      if ((size_t)len == towrite) /* wrote everything out */
+          break;
+      else if (len < 0) { /* error */
+          if (errno == EAGAIN || errno == EWOULDBLOCK) {
+              if (!dsi_peek(dsi)) {
+                  continue;
+              }
           }
+          LOG(log_error, logtype_dsi, "dsi_stream_send: %s", strerror(errno));
+          unblock_sig(dsi);
+          return 0;
       }
-      LOG(log_error, logtype_default, "dsi_stream_send: %s", strerror(errno));
-      unblock_sig(dsi);
-      return 0;
-    }
     
-    towrite -= len;
-    if (towrite > length) { /* skip part of header */
-      iov[0].iov_base = (char *) iov[0].iov_base + len;
-      iov[0].iov_len -= len;
-    } else { /* skip to data */
-      if (iov[0].iov_len) {
-       len -= iov[0].iov_len;
-       iov[0].iov_len = 0;
+      towrite -= len;
+      if (towrite > length) { /* skip part of header */
+          iov[0].iov_base = (char *) iov[0].iov_base + len;
+          iov[0].iov_len -= len;
+      } else { /* skip to data */
+          if (iov[0].iov_len) {
+              len -= iov[0].iov_len;
+              iov[0].iov_len = 0;
+          }
+          iov[1].iov_base = (char *) iov[1].iov_base + len;
+          iov[1].iov_len -= len;
       }
-      iov[1].iov_base = (char *) iov[1].iov_base + len;
-      iov[1].iov_len -= len;
-    }
   }
   
-#else /* USE_WRITEV */
-  /* write the header then data */
-  if ((dsi_stream_write(dsi, block, sizeof(block), 1) != sizeof(block)) ||
-            (dsi_stream_write(dsi, buf, length, 0) != length)) {
-      unblock_sig(dsi);
-      return 0;
-  }
-#endif /* USE_WRITEV */
-
   unblock_sig(dsi);
   return 1;
 }
@@ -452,6 +441,8 @@ int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
 {
   char block[DSI_BLOCKSIZ];
 
+  LOG(log_maxdebug, logtype_dsi, "dsi_stream_receive: %u bytes", ilength);
+
   /* read in the header */
   if (dsi_buffered_stream_read(dsi, (u_int8_t *)block, sizeof(block)) != sizeof(block)) 
     return 0;
@@ -462,7 +453,7 @@ int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
      but we get a server disconnect without reason in the log
   */
   if (!block[1]) {
-      LOG(log_error, logtype_default, "dsi_stream_receive: invalid packet, fatal");
+      LOG(log_error, logtype_dsi, "dsi_stream_receive: invalid packet, fatal");
       return 0;
   }