/*
- * $Id: dsi_stream.c,v 1.18 2009-10-25 09:47:05 didg Exp $
+ * $Id: dsi_stream.c,v 1.20 2009-10-26 12:35:56 franklahm Exp $
*
* Copyright (c) 1998 Adrian Sun (asun@zoology.washington.edu)
* All rights reserved. See COPYRIGHT.
#endif
#include <atalk/logger.h>
-
#include <atalk/dsi.h>
#include <netatalk/endian.h>
#include <atalk/util.h>
-#include <sys/ioctl.h>
#define min(a,b) ((a) < (b) ? (a) : (b))
afpd is sleeping too much while trying to send something.
May be there's no reader or the reader is also sleeping in write,
look if there's some data for us to read, hopefully it will wake up
- the reader
+ the reader so we can write again.
*/
-static int dsi_buffer(DSI *dsi)
+static int dsi_peek(DSI *dsi)
{
fd_set readfds, writefds;
int len;
int maxfd;
- int adr;
+ int ret;
- /* non blocking mode */
- adr = 1;
- if (ioctl(dsi->socket, FIONBIO, &adr) < 0) {
- /* can't do it! exit without error it will sleep to death below */
- LOG(log_error, logtype_default, "dsi_buffer: ioctl non blocking mode %s", strerror(errno));
- return 0;
- }
-
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_SET( dsi->socket, &readfds);
FD_SET( dsi->socket, &writefds);
maxfd = dsi->socket +1;
+
while (1) {
FD_SET( dsi->socket, &readfds);
FD_SET( dsi->socket, &writefds);
- if (select( maxfd, &readfds, &writefds, NULL, NULL) <= 0)
- break;
- if ( !FD_ISSET(dsi->socket, &readfds)) {
- /* nothing waiting in the read queue */
+ /* No timeout: if there's nothing to read nor nothing to write,
+ * we've got nothing to do at all */
+ if ((ret = select( maxfd, &readfds, &writefds, NULL, NULL)) <= 0) {
+ if (ret == -1 && errno == EINTR)
+ /* we might have been interrupted by out timer, so restart select */
+ continue;
+ /* give up */
break;
}
- dsi_init_buffer(dsi);
- len = dsi->end - dsi->eof;
-
- if (len <= 0) {
- /* ouch, our buffer is full !
- * fall back to blocking IO
- * could block and disconnect but it's better than a cpu hog
- */
- break;
+
+ /* Check if there's sth to read, hopefully reading that will unblock the client */
+ if (FD_ISSET(dsi->socket, &readfds)) {
+ dsi_init_buffer(dsi);
+ len = dsi->end - dsi->eof;
+
+ if (len <= 0) {
+ /* ouch, our buffer is full ! fall back to blocking IO
+ * could block and disconnect but it's better than a cpu hog */
+ break;
+ }
+
+ len = read(dsi->socket, dsi->eof, len);
+ if (len <= 0)
+ break;
+ dsi->eof += len;
}
- len = read(dsi->socket, dsi->eof, len);
- if (len <= 0)
- break;
- dsi->eof += len;
- if ( FD_ISSET(dsi->socket, &writefds)) {
+ if (FD_ISSET(dsi->socket, &writefds))
/* we can write again at last */
break;
- }
- }
- adr = 0;
- if (ioctl(dsi->socket, FIONBIO, &adr) < 0) {
- /* can't do it! afpd will fail very quickly */
- LOG(log_error, logtype_default, "dsi_buffer: ioctl blocking mode %s", strerror(errno));
- return -1;
}
+
return 0;
}
{
size_t written;
ssize_t len;
-#if 0
- /* FIXME sometime it's slower */
- unsigned int flags = (mode)?MSG_MORE:0;
-#endif
unsigned int flags = 0;
-#if 0
- /* XXX there's no MSG_DONTWAIT in recv ?? so we have to play with ioctl
- */
- flags |= MSG_DONTWAIT;
-#endif
-
dsi->in_write++;
written = 0;
+
+ LOG(log_maxdebug, logtype_dsi, "dsi_stream_write: sending %u bytes", length);
+
+ /* non blocking mode */
+ if (setnonblock(dsi->socket, 1) < 0) {
+ LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
+ return -1;
+ }
+
while (written < length) {
- if ((-1 == (len = send(dsi->socket, (u_int8_t *) data + written,
- length - written, flags)) && errno == EINTR) ||
- !len)
- continue;
+ len = send(dsi->socket, (u_int8_t *) data + written, length - written, flags);
+ if (len >= 0) {
+ written += len;
+ continue;
+ }
+
+ if (errno == EINTR)
+ continue;
- if (len < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (mode == DSI_NOWAIT && written == 0) {
- /* DSI_NOWAIT is used by attention
- give up in this case.
- */
- return -1;
+ /* DSI_NOWAIT is used by attention give up in this case. */
+ written = -1;
+ goto exit;
}
- if (dsi_buffer(dsi)) {
- /* can't go back to blocking mode, exit, the next read
- will return with an error and afpd will die.
- */
- break;
+
+ /* Try to read sth. in order to break up possible deadlock */
+ if (dsi_peek(dsi) != 0) {
+ written = -1;
+ goto exit;
}
+ /* Now try writing again */
continue;
}
- LOG(log_error, logtype_default, "dsi_stream_write: %s", strerror(errno));
- break;
- }
- else {
- written += len;
- }
+
+ LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
+ written = -1;
+ goto exit;
}
dsi->write_count += written;
+
+exit:
+ if (setnonblock(dsi->socket, 0) < 0) {
+ LOG(log_error, logtype_dsi, "dsi_stream_write: setnonblock: %s", strerror(errno));
+ written = -1;
+ }
+
dsi->in_write--;
return written;
}
/* ---------------------------------
*/
+#ifdef WITH_SENDFILE
ssize_t dsi_stream_read_file(DSI *dsi, int fromfd, off_t offset, const size_t length)
{
size_t written;
dsi->in_write++;
written = 0;
+ /* non blocking mode */
+ if (setnonblock(dsi->socket, 1) < 0) {
+ LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
+ return -1;
+ }
+
while (written < length) {
len = sys_sendfile(dsi->socket, fromfd, &offset, length - written);
return -1;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
- if (dsi_buffer(dsi)) {
+ if (dsi_peek(dsi)) {
/* can't go back to blocking mode, exit, the next read
will return with an error and afpd will die.
*/
}
continue;
}
- LOG(log_error, logtype_default, "dsi_stream_write: %s", strerror(errno));
+ LOG(log_error, logtype_dsi, "dsi_stream_write: %s", strerror(errno));
break;
}
else if (!len) {
written += len;
}
+ if (setnonblock(dsi->socket, 0) < 0) {
+ LOG(log_error, logtype_dsi, "dsi_stream_read_file: setnonblock: %s", strerror(errno));
+ return -1;
+ }
+
dsi->write_count += written;
dsi->in_write--;
return written;
}
+#endif
-/* ---------------------------------
-*/
+/*
+ * Return all bytes up to count from dsi->buffer if there are any buffered there
+ */
static size_t from_buf(DSI *dsi, u_int8_t *buf, size_t count)
{
size_t nbe = 0;
return nbe;
}
+/*
+ * Get bytes from buffer dsi->buffer or read from socket
+ *
+ * 1. Check if there are bytes in the the dsi->buffer buffer.
+ * 2. Return bytes from (1) if yes.
+ * Note: this may return fewer bytes then requested in count !!
+ * 3. If the buffer was empty, read from the socket.
+ */
static ssize_t buf_read(DSI *dsi, u_int8_t *buf, size_t count)
{
ssize_t nbe;
if (!count)
return 0;
- nbe = from_buf(dsi, buf, count);
+ nbe = from_buf(dsi, buf, count); /* 1. */
if (nbe)
- return nbe;
+ return nbe; /* 2. */
- return read(dsi->socket, buf, count);
-
+ return read(dsi->socket, buf, count); /* 3. */
}
-/* ---------------------------------------
- * read raw data. return actual bytes read. this will wait until
- * it gets length bytes
+/*
+ * Essentially a loop around buf_read() to ensure "length" bytes are read
+ * from dsi->buffer and/or the socket.
*/
size_t dsi_stream_read(DSI *dsi, void *data, const size_t length)
{
size_t stored;
ssize_t len;
-
+
stored = 0;
while (stored < length) {
len = buf_read(dsi, (u_int8_t *) data + stored, length - stored);
- if (len == -1 && errno == EINTR)
+ if (len == -1 && errno == EINTR) {
continue;
- else if (len > 0)
+ } else if (len > 0) {
stored += len;
- else { /* eof or error */
+ } else { /* eof or error */
/* don't log EOF error if it's just after connect (OSX 10.3 probe) */
if (len || stored || dsi->read_count) {
- LOG(log_error, logtype_default, "dsi_stream_read(%d): %s", len, (len < 0)?strerror(errno):"unexpected EOF");
+ if (! (dsi->flags & DSI_DISCONNECTED))
+ LOG(log_error, logtype_dsi, "dsi_stream_read(fd: %i): len:%d, %s",
+ dsi->socket, len, (len < 0) ? strerror(errno) : "unexpected EOF");
}
break;
}
return stored;
}
-/* ---------------------------------------
- * read raw data. return actual bytes read. this will wait until
- * it gets length bytes
+/*
+ * Get "length" bytes from buffer and/or socket. In order to avoid frequent small reads
+ * this tries to read larger chunks (8192 bytes) into a buffer.
*/
static size_t dsi_buffered_stream_read(DSI *dsi, u_int8_t *data, const size_t length)
{
size_t buflen;
dsi_init_buffer(dsi);
- len = from_buf(dsi, data, length);
+ len = from_buf(dsi, data, length); /* read from buffer dsi->buffer */
dsi->read_count += len;
- if (len == length) {
- return len;
+ if (len == length) { /* got enough bytes from there ? */
+ return len; /* yes */
}
-
+
+ /* fill the buffer with 8192 bytes or until buffer is full */
buflen = min(8192, dsi->end - dsi->eof);
if (buflen > 0) {
ssize_t ret;
if (ret > 0)
dsi->eof += ret;
}
- return dsi_stream_read(dsi, data, length -len);
+
+ /* now get the remaining data */
+ len += dsi_stream_read(dsi, data + len, length - len);
+ return len;
}
/* ---------------------------------------
*/
void dsi_sleep(DSI *dsi, const int state)
{
- dsi->asleep = state;
+ if (state)
+ dsi->flags |= DSI_SLEEPING;
+ else
+ dsi->flags &= ~DSI_SLEEPING;
}
/* ---------------------------------------
break;
else if (len < 0) { /* error */
if (errno == EAGAIN || errno == EWOULDBLOCK) {
- if (!dsi_buffer(dsi)) {
+ if (!dsi_peek(dsi)) {
continue;
}
}
- LOG(log_error, logtype_default, "dsi_stream_send: %s", strerror(errno));
+ LOG(log_error, logtype_dsi, "dsi_stream_send: %s", strerror(errno));
unblock_sig(dsi);
return 0;
}
but we get a server disconnect without reason in the log
*/
if (!block[1]) {
- LOG(log_error, logtype_default, "dsi_stream_receive: invalid packet, fatal");
+ LOG(log_error, logtype_dsi, "dsi_stream_receive: invalid packet, fatal");
return 0;
}