/*
- * $Id: dsi_stream.c,v 1.12 2005-04-28 20:50:02 bfernhomberg Exp $
+ * $Id: dsi_stream.c,v 1.15 2009-10-22 04:59:50 didg Exp $
*
* Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
* All rights reserved. See COPYRIGHT.
/* -------------------------
* we don't use a circular buffer.
*/
-void dsi_buffer(DSI *dsi)
+static void dsi_init_buffer(DSI *dsi)
+{
+ if (!dsi->buffer) {
+ /* XXX config options */
+ dsi->maxsize = 6 * dsi->server_quantum;
+ if (!dsi->maxsize)
+ dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
+ dsi->buffer = malloc(dsi->maxsize);
+ if (!dsi->buffer) {
+ return;
+ }
+ dsi->start = dsi->buffer;
+ dsi->eof = dsi->buffer;
+ dsi->end = dsi->buffer + dsi->maxsize;
+ }
+}
+
+/* ---------------------- */
+static void dsi_buffer(DSI *dsi)
{
fd_set readfds, writefds;
int len;
/* nothing waiting in the read queue */
return;
}
- if (!dsi->buffer) {
- /* XXX config options */
- dsi->maxsize = 6 * dsi->server_quantum;
- if (!dsi->maxsize)
- dsi->maxsize = 6 * DSI_SERVQUANT_DEF;
- dsi->buffer = malloc(dsi->maxsize);
- if (!dsi->buffer) {
- /* fall back to blocking IO */
- dsi_block(dsi, 0);
- return;
- }
- dsi->start = dsi->buffer;
- dsi->eof = dsi->buffer;
- dsi->end = dsi->buffer + dsi->maxsize;
- }
+ dsi_init_buffer(dsi);
len = dsi->end - dsi->eof;
if (len <= 0) {
}
#endif
+ dsi->in_write++;
written = 0;
while (written < length) {
if ((-1 == (len = send(dsi->socket, (u_int8_t *) data + written,
}
dsi->write_count += written;
+ dsi->in_write--;
return written;
}
/* ---------------------------------
*/
-static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
+static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
{
- ssize_t nbe = 0;
- ssize_t ret;
+ size_t nbe = 0;
- if (!count)
- return 0;
-
if (dsi->start) {
nbe = dsi->eof - dsi->start;
if (dsi->eof == dsi->start)
dsi->start = dsi->eof = dsi->buffer;
- if (nbe == count)
- return nbe;
- count -= nbe;
- buf += nbe;
}
- else
- nbe = 0;
}
+ return nbe;
+}
+
+static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
+{
+ ssize_t nbe;
+
+ if (!count)
+ return 0;
+
+ nbe = from_buf(dsi, buf, count);
+ if (nbe)
+ return nbe;
- ret = read(dsi->socket, buf, count);
- if (ret <= 0)
- return ret;
+ return read(dsi->socket, buf, count);
- return ret +nbe;
}
/* ---------------------------------------
return stored;
}
+/* ---------------------------------------
+ * read raw data. return actual bytes read. this will wait until
+ * it gets length bytes
+ */
+static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
+{
+ size_t len;
+ size_t buflen;
+
+ dsi_init_buffer(dsi);
+ len = from_buf(dsi, data, length);
+ dsi->read_count += len;
+ if (len == length) {
+ return len;
+ }
+
+ buflen = min(8192, dsi->end - dsi->eof);
+ if (buflen > 0) {
+ ssize_t ret;
+ ret = read(dsi->socket, dsi->eof, buflen);
+ if (ret > 0)
+ dsi->eof += ret;
+ }
+ return dsi_stream_read(dsi, data, length -len);
+}
+
/* ---------------------------------------
*/
void dsi_sleep(DSI *dsi, const int state)
*/
static void block_sig(DSI *dsi)
{
+ dsi->in_write++;
if (!dsi->sigblocked) sigprocmask(SIG_BLOCK, &dsi->sigblockset, &dsi->oldset);
}
*/
static void unblock_sig(DSI *dsi)
{
+ dsi->in_write--;
if (!dsi->sigblocked) sigprocmask(SIG_SETMASK, &dsi->oldset, NULL);
}
!len)
continue;
- if (len == towrite) /* wrote everything out */
+ if ((size_t)len == towrite) /* wrote everything out */
break;
else if (len < 0) { /* error */
LOG(log_error, logtype_default, "dsi_stream_send: %s", strerror(errno));
char block[DSI_BLOCKSIZ];
/* read in the header */
- if (dsi_stream_read(dsi, block, sizeof(block)) != sizeof(block))
+ if (dsi_buffered_stream_read(dsi, block, sizeof(block)) != sizeof(block))
return 0;
dsi->header.dsi_flags = block[0];