]> arthur.barton.de Git - netatalk.git/blobdiff - libatalk/dsi/dsi_stream.c
remove the need for blocking SIGALRM (still need to deal with attention volumes changed)
[netatalk.git] / libatalk / dsi / dsi_stream.c
index fbbfbcc974b3d5b60cfd723331342c6a2b471b20..872ea1877af716610af35a1a2be70f90cf7a9328 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * $Id: dsi_stream.c,v 1.12 2005-04-28 20:50:02 bfernhomberg Exp $
+ * $Id: dsi_stream.c,v 1.15 2009-10-22 04:59:50 didg Exp $
  *
  * Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
  * All rights reserved. See COPYRIGHT.
 /* ------------------------- 
  * we don't use a circular buffer.
 */
-void dsi_buffer(DSI *dsi)
+static void dsi_init_buffer(DSI *dsi)
+{
+    if (!dsi->buffer) {
+        /* XXX config options */
+        dsi->maxsize = 6 * dsi->server_quantum;
+        if (!dsi->maxsize)
+            dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
+        dsi->buffer = malloc(dsi->maxsize);
+        if (!dsi->buffer) {
+            return;
+        }
+        dsi->start = dsi->buffer;
+        dsi->eof = dsi->buffer;
+        dsi->end = dsi->buffer + dsi->maxsize;
+    }
+}
+
+/* ---------------------- */
+static void dsi_buffer(DSI *dsi)
 {
     fd_set readfds, writefds;
     int    len;
@@ -72,21 +90,7 @@ void dsi_buffer(DSI *dsi)
             /* nothing waiting in the read queue */
             return;
         }
-        if (!dsi->buffer) {
-            /* XXX config options */
-            dsi->maxsize = 6 * dsi->server_quantum;
-            if (!dsi->maxsize)
-                dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
-            dsi->buffer = malloc(dsi->maxsize);
-            if (!dsi->buffer) {
-                /* fall back to blocking IO */
-                dsi_block(dsi, 0);
-                return;
-            }
-            dsi->start = dsi->buffer;
-            dsi->eof = dsi->buffer;
-            dsi->end = dsi->buffer + dsi->maxsize;
-        }
+        dsi_init_buffer(dsi);
         len = dsi->end - dsi->eof;
 
         if (len <= 0) {
@@ -130,6 +134,7 @@ size_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode _U_)
   }
 #endif
   
+  dsi->in_write++;
   written = 0;
   while (written < length) {
     if ((-1 == (len = send(dsi->socket, (u_int8_t *) data + written,
@@ -156,19 +161,16 @@ size_t dsi_stream_write(DSI *dsi, void *data, const size_t length, int mode _U_)
   }
 
   dsi->write_count += written;
+  dsi->in_write--;
   return written;
 }
 
 /* ---------------------------------
 */
-static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
+static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
 {
-    ssize_t nbe = 0;
-    ssize_t ret;
+    size_t nbe = 0;
     
-    if (!count)
-        return 0;
-        
     if (dsi->start) {        
         nbe = dsi->eof - dsi->start;
 
@@ -180,20 +182,24 @@ static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
            if (dsi->eof == dsi->start) 
                dsi->start = dsi->eof = dsi->buffer;
 
-           if (nbe == count)
-               return nbe;
-           count -= nbe;
-           buf += nbe;
         }
-        else 
-           nbe = 0;
     }
+    return nbe;
+}
+
+static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
+{
+    ssize_t nbe;
+    
+    if (!count)
+        return 0;
+
+    nbe = from_buf(dsi, buf, count);
+    if (nbe)
+        return nbe;
   
-    ret = read(dsi->socket, buf, count);
-    if (ret <= 0)
-        return ret;
+    return read(dsi->socket, buf, count);
 
-    return ret +nbe;
 }
 
 /* ---------------------------------------
@@ -225,6 +231,32 @@ size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
   return stored;
 }
 
+/* ---------------------------------------
+ * read raw data. return actual bytes read. this will wait until 
+ * it gets length bytes 
+ */
+static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
+{
+  size_t len;
+  size_t buflen;
+  
+  dsi_init_buffer(dsi);
+  len = from_buf(dsi, data, length);
+  dsi->read_count += len;
+  if (len == length) {
+      return len;
+  }
+  
+  buflen = min(8192, dsi->end - dsi->eof);
+  if (buflen > 0) {
+      ssize_t ret;
+      ret = read(dsi->socket, dsi->eof, buflen);
+      if (ret > 0)
+          dsi->eof += ret;
+  }
+  return dsi_stream_read(dsi, data, length -len);
+}
+
 /* ---------------------------------------
 */
 void dsi_sleep(DSI *dsi, const int state)
@@ -236,6 +268,7 @@ void dsi_sleep(DSI *dsi, const int state)
 */
 static void block_sig(DSI *dsi)
 {
+  dsi->in_write++;
   if (!dsi->sigblocked) sigprocmask(SIG_BLOCK, &dsi->sigblockset, &dsi->oldset);
 }
 
@@ -243,6 +276,7 @@ static void block_sig(DSI *dsi)
 */
 static void unblock_sig(DSI *dsi)
 {
+  dsi->in_write--;
   if (!dsi->sigblocked) sigprocmask(SIG_SETMASK, &dsi->oldset, NULL);
 }
 
@@ -288,7 +322,7 @@ int dsi_stream_send(DSI *dsi, void *buf, size_t length)
        !len)
       continue;
     
-    if (len == towrite) /* wrote everything out */
+    if ((size_t)len == towrite) /* wrote everything out */
       break;
     else if (len < 0) { /* error */
       LOG(log_error, logtype_default, "dsi_stream_send: %s", strerror(errno));
@@ -334,7 +368,7 @@ int dsi_stream_receive(DSI *dsi, void *buf, const size_t ilength,
   char block[DSI_BLOCKSIZ];
 
   /* read in the header */
-  if (dsi_stream_read(dsi, block, sizeof(block)) != sizeof(block)) 
+  if (dsi_buffered_stream_read(dsi, block, sizeof(block)) != sizeof(block)) 
     return 0;
 
   dsi->header.dsi_flags = block[0];