void (*init_routine) (void);
void *(*start_routine) (void *);
-};
-
-struct netdata_static_thread static_threads[] = {
- {"tc", "plugins", "tc", 1, NULL, NULL, tc_main},
- {"idlejitter", "plugins", "idlejitter", 1, NULL, NULL, cpuidlejitter_main},
- {"proc", "plugins", "proc", 1, NULL, NULL, proc_main},
- {"cgroups", "plugins", "cgroups", 1, NULL, NULL, cgroups_main},
-
+} static_threads[] = {
#ifdef INTERNAL_PLUGIN_NFACCT
- // nfacct requires root access
+// nfacct requires root access
// so, we build it as an external plugin with setuid to root
- {"nfacct", "plugins", "nfacct", 1, NULL, NULL, nfacct_main},
+ {"nfacct", "plugins", "nfacct", 1, NULL, NULL, nfacct_main},
#endif
- {"plugins.d", NULL, NULL, 1, NULL, NULL, pluginsd_main},
- {"check", "plugins", "checks", 0, NULL, NULL, checks_main},
- {"web", NULL, NULL, 1, NULL, NULL, socket_listen_main},
- {NULL, NULL, NULL, 0, NULL, NULL, NULL}
+ {"tc", "plugins", "tc", 1, NULL, NULL, tc_main},
+ {"idlejitter", "plugins", "idlejitter", 1, NULL, NULL, cpuidlejitter_main},
+ {"proc", "plugins", "proc", 1, NULL, NULL, proc_main},
+ {"cgroups", "plugins", "cgroups", 1, NULL, NULL, cgroups_main},
+ {"plugins.d", NULL, NULL, 1, NULL, NULL, pluginsd_main},
+ {"check", "plugins", "checks", 0, NULL, NULL, checks_main},
+ {"web", NULL, NULL, 1, NULL, NULL, socket_listen_main_multi_threaded},
+ {"web-single-threaded", NULL, NULL, 0, NULL, NULL, socket_listen_main_single_threaded},
+ {NULL, NULL, NULL, 0, NULL, NULL, NULL}
};
+void web_server_threading_selection(void) {
+ int threaded = config_get_boolean("global", "multi threaded web server", 1);
+
+ int i;
+ for(i = 0; static_threads[i].name ; i++) {
+ if(static_threads[i].start_routine == socket_listen_main_multi_threaded)
+ static_threads[i].enabled = threaded?1:0;
+
+ if(static_threads[i].start_routine == socket_listen_main_single_threaded)
+ static_threads[i].enabled = threaded?0:1;
+ }
+}
+
+
int killpid(pid_t pid, int sig)
{
int ret = -1;
// --------------------------------------------------------------------
- listen_backlog = (int) config_get_number("global", "http port listen backlog", LISTEN_BACKLOG);
-
- listen_port = (int) config_get_number("global", "port", LISTEN_PORT);
- if(listen_port < 1 || listen_port > 65535) {
- info("Invalid listen port %d given. Defaulting to %d.", listen_port, LISTEN_PORT);
- listen_port = LISTEN_PORT;
- }
- else debug(D_OPTIONS, "Listen port set to %d.", listen_port);
-
- int ip = 0;
- char *ipv = config_get("global", "ip version", "any");
- if(!strcmp(ipv, "any") || !strcmp(ipv, "both") || !strcmp(ipv, "all")) ip = 0;
- else if(!strcmp(ipv, "ipv4") || !strcmp(ipv, "IPV4") || !strcmp(ipv, "IPv4") || !strcmp(ipv, "4")) ip = 4;
- else if(!strcmp(ipv, "ipv6") || !strcmp(ipv, "IPV6") || !strcmp(ipv, "IPv6") || !strcmp(ipv, "6")) ip = 6;
- else info("Cannot understand ip version '%s'. Assuming 'any'.", ipv);
-
- if(ip == 0 || ip == 6) listen_fd = create_listen_socket6(config_get("global", "bind socket to IP", "*"), listen_port, listen_backlog);
- if(listen_fd < 0) {
- listen_fd = create_listen_socket4(config_get("global", "bind socket to IP", "*"), listen_port, listen_backlog);
- if(listen_fd >= 0 && ip != 4) info("Managed to open an IPv4 socket on port %d.", listen_port);
- }
-
+ listen_fd = create_listen_socket();
if(listen_fd < 0) fatal("Cannot listen socket.");
}
// ------------------------------------------------------------------------
// spawn the threads
+ web_server_threading_selection();
+
for (i = 0; static_threads[i].name != NULL ; i++) {
struct netdata_static_thread *st = &static_threads[i];
w->stats_received_bytes = 0;
w->stats_sent_bytes = 0;
- long sent = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
+ size_t sent = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
#ifdef NETDATA_WITH_ZLIB
- if(likely(w->response.zoutput)) sent = (long)w->response.zstream.total_out;
+ if(likely(w->response.zoutput)) sent = (size_t)w->response.zstream.total_out;
#endif
- long size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
+ size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
if(likely(w->last_url[0]))
- log_access("%llu: (sent/all = %ld/%ld bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %s: %d '%s'",
+ log_access("%llu: (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %s: %d '%s'",
w->id,
sent, size, -((size>0)?((float)(size-sent)/(float)size * 100.0):0.0),
(float)usecdiff(&w->tv_ready, &w->tv_in) / 1000.0,
);
if(unlikely(w->mode == WEB_CLIENT_MODE_FILECOPY)) {
- debug(D_WEB_CLIENT, "%llu: Closing filecopy input file.", w->id);
- close(w->ifd);
- w->ifd = w->ofd;
+ if(w->ifd != w->ofd) {
+ debug(D_WEB_CLIENT, "%llu: Closing filecopy input file descriptor %d.", w->id, w->ifd);
+ close(w->ifd);
+ w->ifd = w->ofd;
+ }
}
w->last_url[0] = '\0';
w->origin[1] = '\0';
w->mode = WEB_CLIENT_MODE_NORMAL;
+
w->keepalive = 0;
w->decoded_url[0] = '\0';
struct web_client *web_client_free(struct web_client *w)
{
struct web_client *n = w->next;
+ if(w == web_clients) web_clients = n;
debug(D_WEB_CLIENT_ACCESS, "%llu: Closing web client from %s port %s.", w->id, w->client_ip, w->client_port);
if(w->prev) w->prev->next = w->next;
if(w->next) w->next->prev = w->prev;
-
- if(w == web_clients) web_clients = w->next;
-
if(w->response.header_output) buffer_free(w->response.header_output);
if(w->response.header) buffer_free(w->response.header);
if(w->response.data) buffer_free(w->response.data);
- close(w->ifd);
- if(w->ofd != w->ifd) close(w->ofd);
+ if(w->ifd != -1) close(w->ifd);
+ if(w->ofd != -1 && w->ofd != w->ifd) close(w->ofd);
free(w);
global_statistics.connected_clients--;
#ifdef NETDATA_WITH_ZLIB
void web_client_enable_deflate(struct web_client *w, int gzip) {
- if(w->response.zinitialized == 1) {
+ if(w->response.zinitialized) {
error("%llu: Compression has already be initialized for this client.", w->id);
return;
}
buffer_strcat(w->response.header_output, "\r\n");
- // disable TCP_NODELAY, to buffer the header
+/* // disable TCP_NODELAY, to buffer the header
int flag = 0;
if(setsockopt(w->ofd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) != 0)
error("%llu: failed to disable TCP_NODELAY on socket.", w->id);
-
+*/
// sent the HTTP header
debug(D_WEB_DATA, "%llu: Sending response HTTP header of size %d: '%s'"
, w->id
);
bytes = send(w->ofd, buffer_tostring(w->response.header_output), buffer_strlen(w->response.header_output), 0);
- if(bytes != (ssize_t) buffer_strlen(w->response.header_output))
- error("%llu: HTTP Header failed to be sent (I sent %d bytes but the system sent %d bytes)."
- , w->id
- , buffer_strlen(w->response.header_output)
- , bytes);
+ if(bytes != (ssize_t) buffer_strlen(w->response.header_output)) {
+ if(bytes > 0)
+ w->stats_sent_bytes += bytes;
+
+ error("%llu: HTTP Header failed to be sent (I sent %d bytes but the system sent %d bytes). Closing web client.", w->id,
+ buffer_strlen(w->response.header_output), bytes);
+
+ WEB_CLIENT_IS_DEAD(w);
+ return;
+ }
else
w->stats_sent_bytes += bytes;
- // enable TCP_NODELAY, to send all data immediately at the next send()
+/* // enable TCP_NODELAY, to send all data immediately at the next send()
flag = 1;
if(setsockopt(w->ofd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) != 0) error("%llu: failed to enable TCP_NODELAY on socket.", w->id);
-
+*/
// enable sending immediately if we have data
if(w->response.data->len) w->wait_send = 1;
else w->wait_send = 0;
}
}
-long web_client_send_chunk_header(struct web_client *w, long len)
+ssize_t web_client_send_chunk_header(struct web_client *w, size_t len)
{
debug(D_DEFLATE, "%llu: OPEN CHUNK of %d bytes (hex: %x).", w->id, len, len);
char buf[1024];
sprintf(buf, "%lX\r\n", len);
- ssize_t bytes = send(w->ofd, buf, strlen(buf), MSG_DONTWAIT);
+
+ ssize_t bytes = send(w->ofd, buf, strlen(buf), 0);
+ if(bytes > 0) {
+ debug(D_DEFLATE, "%llu: Sent chunk header %d bytes.", w->id, bytes);
+ w->stats_sent_bytes += bytes;
+ }
- if(bytes > 0) debug(D_DEFLATE, "%llu: Sent chunk header %d bytes.", w->id, bytes);
- else if(bytes == 0) debug(D_DEFLATE, "%llu: Did not send chunk header to the client.", w->id);
- else debug(D_DEFLATE, "%llu: Failed to send chunk header to client.", w->id);
+ else if(bytes == 0) {
+ error("%llu: Did not send chunk header to the client.", w->id);
+ WEB_CLIENT_IS_DEAD(w);
+ }
+ else {
+ error("%llu: Failed to send chunk header to client.", w->id);
+ WEB_CLIENT_IS_DEAD(w);
+ }
return bytes;
}
-long web_client_send_chunk_close(struct web_client *w)
+ssize_t web_client_send_chunk_close(struct web_client *w)
{
//debug(D_DEFLATE, "%llu: CLOSE CHUNK.", w->id);
- ssize_t bytes = send(w->ofd, "\r\n", 2, MSG_DONTWAIT);
+ ssize_t bytes = send(w->ofd, "\r\n", 2, 0);
+ if(bytes > 0) {
+ debug(D_DEFLATE, "%llu: Sent chunk suffix %d bytes.", w->id, bytes);
+ w->stats_sent_bytes += bytes;
+ }
- if(bytes > 0) debug(D_DEFLATE, "%llu: Sent chunk suffix %d bytes.", w->id, bytes);
- else if(bytes == 0) debug(D_DEFLATE, "%llu: Did not send chunk suffix to the client.", w->id);
- else debug(D_DEFLATE, "%llu: Failed to send chunk suffix to client.", w->id);
+ else if(bytes == 0) {
+ error("%llu: Did not send chunk suffix to the client.", w->id);
+ WEB_CLIENT_IS_DEAD(w);
+ }
+ else {
+ error("%llu: Failed to send chunk suffix to client.", w->id);
+ WEB_CLIENT_IS_DEAD(w);
+ }
return bytes;
}
-long web_client_send_chunk_finalize(struct web_client *w)
+ssize_t web_client_send_chunk_finalize(struct web_client *w)
{
//debug(D_DEFLATE, "%llu: FINALIZE CHUNK.", w->id);
- ssize_t bytes = send(w->ofd, "\r\n0\r\n\r\n", 7, MSG_DONTWAIT);
+ ssize_t bytes = send(w->ofd, "\r\n0\r\n\r\n", 7, 0);
+ if(bytes > 0) {
+ debug(D_DEFLATE, "%llu: Sent chunk suffix %d bytes.", w->id, bytes);
+ w->stats_sent_bytes += bytes;
+ }
- if(bytes > 0) debug(D_DEFLATE, "%llu: Sent chunk suffix %d bytes.", w->id, bytes);
- else if(bytes == 0) debug(D_DEFLATE, "%llu: Did not send chunk suffix to the client.", w->id);
- else debug(D_DEFLATE, "%llu: Failed to send chunk suffix to client.", w->id);
+ else if(bytes == 0) {
+ error("%llu: Did not send chunk suffix to the client.", w->id);
+ WEB_CLIENT_IS_DEAD(w);
+ }
+ else {
+ error("%llu: Failed to send chunk suffix to client.", w->id);
+ WEB_CLIENT_IS_DEAD(w);
+ }
return bytes;
}
#ifdef NETDATA_WITH_ZLIB
-long web_client_send_deflate(struct web_client *w)
+ssize_t web_client_send_deflate(struct web_client *w)
{
- long len = 0, t = 0;
+ ssize_t len = 0, t = 0;
// when using compression,
// w->response.sent is the amount of bytes passed through compression
debug(D_WEB_CLIENT, "%llu: Out of output data.", w->id);
// finalize the chunk
- if(w->response.sent != 0)
- t += web_client_send_chunk_finalize(w);
-
- // there can be two cases for this
- // A. we have done everything
- // B. we temporarily have nothing to send, waiting for the buffer to be filled by ifd
+ if(w->response.sent != 0) {
+ t = web_client_send_chunk_finalize(w);
+ if(t < 0) return t;
+ }
- if(w->mode == WEB_CLIENT_MODE_FILECOPY && w->wait_receive && w->ifd != w->ofd && w->response.rlen && w->response.rlen > w->response.data->len) {
+ if(w->mode == WEB_CLIENT_MODE_FILECOPY && w->wait_receive && w->response.rlen && w->response.rlen > w->response.data->len) {
// we have to wait, more data will come
debug(D_WEB_CLIENT, "%llu: Waiting for more data to become available.", w->id);
w->wait_send = 0;
- return(0);
+ return t;
}
- if(w->keepalive == 0) {
+ if(unlikely(w->keepalive == 0)) {
debug(D_WEB_CLIENT, "%llu: Closing (keep-alive is not enabled). %ld bytes sent.", w->id, w->response.sent);
errno = 0;
return(-1);
// reset the client
web_client_reset(w);
- debug(D_WEB_CLIENT, "%llu: Done sending all data on socket. Waiting for next request on the same socket.", w->id);
- return(0);
+ debug(D_WEB_CLIENT, "%llu: Done sending all data on socket.", w->id);
+ return t;
}
if(w->response.zhave == w->response.zsent) {
// compress more input data
// close the previous open chunk
- if(w->response.sent != 0) t += web_client_send_chunk_close(w);
+ if(w->response.sent != 0) {
+ t = web_client_send_chunk_close(w);
+ if(t < 0) return t;
+ }
debug(D_DEFLATE, "%llu: Compressing %d new bytes starting from %d (and %d left behind).", w->id, (w->response.data->len - w->response.sent), w->response.sent, w->response.zstream.avail_in);
// give the compressor all the data not passed through the compressor yet
if(w->response.data->len > w->response.sent) {
-#ifdef NETDATA_INTERNAL_CHECKS
- if((long)w->response.sent - (long)w->response.zstream.avail_in < 0)
- error("internal error: avail_in is corrupted.");
-#endif
w->response.zstream.next_in = (Bytef *)&w->response.data->buffer[w->response.sent - w->response.zstream.avail_in];
w->response.zstream.avail_in += (uInt) (w->response.data->len - w->response.sent);
}
debug(D_DEFLATE, "%llu: Compression produced %d bytes.", w->id, w->response.zhave);
// open a new chunk
- t += web_client_send_chunk_header(w, w->response.zhave);
+ ssize_t t2 = web_client_send_chunk_header(w, w->response.zhave);
+ if(t2 < 0) return t2;
+ t += t2;
}
debug(D_WEB_CLIENT, "%llu: Sending %d bytes of data (+%d of chunk header).", w->id, w->response.zhave - w->response.zsent, t);
len = send(w->ofd, &w->response.zbuffer[w->response.zsent], (size_t) (w->response.zhave - w->response.zsent), MSG_DONTWAIT);
if(len > 0) {
+ w->stats_sent_bytes += len;
w->response.zsent += len;
- if(t > 0) len += t;
+ len += t;
debug(D_WEB_CLIENT, "%llu: Sent %d bytes.", w->id, len);
}
- else if(len == 0) debug(D_WEB_CLIENT, "%llu: Did not send any bytes to the client (zhave = %ld, zsent = %ld, need to send = %ld).", w->id, w->response.zhave, w->response.zsent, w->response.zhave - w->response.zsent);
- else debug(D_WEB_CLIENT, "%llu: Failed to send data to client. Reason: %s", w->id, strerror(errno));
+ else if(len == 0) {
+ error("%llu: Did not send any bytes to the client (zhave = %ld, zsent = %ld, need to send = %ld).", w->id, w->response.zhave, w->response.zsent, w->response.zhave - w->response.zsent);
+ WEB_CLIENT_IS_DEAD(w);
+ }
+ else {
+ error("%llu: Failed to send data to client.", w->id);
+ WEB_CLIENT_IS_DEAD(w);
+ }
return(len);
}
#endif // NETDATA_WITH_ZLIB
-long web_client_send(struct web_client *w)
-{
+ssize_t web_client_send(struct web_client *w) {
#ifdef NETDATA_WITH_ZLIB
if(likely(w->response.zoutput)) return web_client_send_deflate(w);
#endif // NETDATA_WITH_ZLIB
- long bytes;
+ ssize_t bytes;
if(unlikely(w->response.data->len - w->response.sent == 0)) {
// there is nothing to send
// A. we have done everything
// B. we temporarily have nothing to send, waiting for the buffer to be filled by ifd
- if(w->mode == WEB_CLIENT_MODE_FILECOPY && w->wait_receive && w->ifd != w->ofd && w->response.rlen && w->response.rlen > w->response.data->len) {
+ if(w->mode == WEB_CLIENT_MODE_FILECOPY && w->wait_receive && w->response.rlen && w->response.rlen > w->response.data->len) {
// we have to wait, more data will come
debug(D_WEB_CLIENT, "%llu: Waiting for more data to become available.", w->id);
w->wait_send = 0;
bytes = send(w->ofd, &w->response.data->buffer[w->response.sent], w->response.data->len - w->response.sent, MSG_DONTWAIT);
if(likely(bytes > 0)) {
+ w->stats_sent_bytes += bytes;
w->response.sent += bytes;
debug(D_WEB_CLIENT, "%llu: Sent %d bytes.", w->id, bytes);
}
- else if(likely(bytes == 0)) debug(D_WEB_CLIENT, "%llu: Did not send any bytes to the client.", w->id);
- else debug(D_WEB_CLIENT, "%llu: Failed to send data to client.", w->id);
+ else if(likely(bytes == 0)) {
+ error("%llu: Did not send any bytes to the client.", w->id);
+ WEB_CLIENT_IS_DEAD(w);
+ }
+ else {
+ error("%llu: Failed to send data to client.", w->id);
+ WEB_CLIENT_IS_DEAD(w);
+ }
return(bytes);
}
-long web_client_receive(struct web_client *w)
+ssize_t web_client_receive(struct web_client *w)
{
// do we have any space for more data?
buffer_need_bytes(w->response.data, WEB_REQUEST_LENGTH);
- long left = w->response.data->size - w->response.data->len;
- long bytes;
+ ssize_t left = w->response.data->size - w->response.data->len;
+ ssize_t bytes;
if(unlikely(w->mode == WEB_CLIENT_MODE_FILECOPY))
bytes = read(w->ifd, &w->response.data->buffer[w->response.data->len], (size_t) (left - 1));
bytes = recv(w->ifd, &w->response.data->buffer[w->response.data->len], (size_t) (left - 1), MSG_DONTWAIT);
if(likely(bytes > 0)) {
+ if(w->mode != WEB_CLIENT_MODE_FILECOPY)
+ w->stats_received_bytes += bytes;
+
size_t old = w->response.data->len;
w->response.data->len += bytes;
w->response.data->buffer[w->response.data->len] = '\0';
if(w->mode == WEB_CLIENT_MODE_FILECOPY) {
w->wait_send = 1;
- if(w->response.rlen && w->response.data->len >= w->response.rlen) w->wait_receive = 0;
+
+ if(w->response.rlen && w->response.data->len >= w->response.rlen)
+ w->wait_receive = 0;
}
}
else if(likely(bytes == 0)) {
// we are copying data from ifd to ofd
// let it finish copying...
w->wait_receive = 0;
- debug(D_WEB_CLIENT, "%llu: Disabling input.", w->id);
+
+ debug(D_WEB_CLIENT, "%llu: Read the whole file.", w->id);
+ if(w->ifd != w->ofd) close(w->ifd);
+ w->ifd = w->ofd;
}
else {
- bytes = -1;
- errno = 0;
+ error("%llu: failed to receive data.", w->id);
+ WEB_CLIENT_IS_DEAD(w);
}
}
+ else {
+ error("%llu: receive data failed.", w->id);
+ WEB_CLIENT_IS_DEAD(w);
+ }
return(bytes);
}
if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
error("Cannot set pthread cancel state to ENABLE.");
- struct timeval tv;
struct web_client *w = ptr;
int retval;
fd_set ifds, ofds, efds;
log_access("%llu: %s port %s connected on thread task id %d", w->id, w->client_ip, w->client_port, gettid());
for(;;) {
+ if(unlikely(w->dead)) {
+ error("%llu: client is dead.");
+ break;
+ }
+ else if(unlikely(!w->wait_receive && !w->wait_send)) {
+ error("%llu: client is not set for neither receiving nor sending data.");
+ break;
+ }
+
FD_ZERO (&ifds);
FD_ZERO (&ofds);
FD_ZERO (&efds);
if(w->ofd > fdmax) fdmax = w->ofd;
}
- tv.tv_sec = web_client_timeout;
- tv.tv_usec = 0;
-
debug(D_WEB_CLIENT, "%llu: Waiting socket async I/O for %s %s", w->id, w->wait_receive?"INPUT":"", w->wait_send?"OUTPUT":"");
+ struct timeval tv = { .tv_sec = web_client_timeout, .tv_usec = 0 };
retval = select(fdmax+1, &ifds, &ofds, &efds, &tv);
if(retval == -1) {
- debug(D_WEB_CLIENT_ACCESS, "%llu: LISTENER: select() failed.", w->id);
- continue;
+ error("%llu: LISTENER: select() failed (input fd = %d, output fd = %d). Closing client.", w->id, w->ifd, w->ofd);
+ break;
}
else if(!retval) {
// timeout
}
if(w->wait_send && FD_ISSET(w->ofd, &ofds)) {
- long bytes;
+ ssize_t bytes;
if((bytes = web_client_send(w)) < 0) {
debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id);
errno = 0;
break;
}
- else
- w->stats_sent_bytes += bytes;
}
if(w->wait_receive && FD_ISSET(w->ifd, &ifds)) {
- long bytes;
+ ssize_t bytes;
if((bytes = web_client_receive(w)) < 0) {
debug(D_WEB_CLIENT, "%llu: Cannot receive data from client. Closing client.", w->id);
errno = 0;
break;
}
- else
- w->stats_received_bytes += bytes;
if(w->mode == WEB_CLIENT_MODE_NORMAL) {
debug(D_WEB_CLIENT, "%llu: Attempting to process received data (%ld bytes).", w->id, bytes);
web_client_reset(w);
w->obsolete = 1;
+ pthread_exit(NULL);
return NULL;
}
int code; // the HTTP response code
- size_t rlen; // if non-zero, the excepted size of ifd (input)
+ size_t rlen; // if non-zero, the excepted size of ifd (input of firecopy)
size_t sent; // current data length sent to output
int zoutput; // if set to 1, web_client_send() will send compressed data
#ifdef NETDATA_WITH_ZLIB
z_stream zstream; // zlib stream for sending compressed output to client
Bytef zbuffer[ZLIB_CHUNK]; // temporary buffer for storing compressed output
- long zsent; // the compressed bytes we have sent to the client
- long zhave; // the compressed bytes that we have to send
- int zinitialized;
+ size_t zsent; // the compressed bytes we have sent to the client
+ size_t zhave; // the compressed bytes that we have received from zlib
+ int zinitialized:1;
#endif
};
struct web_client {
unsigned long long id;
+ uint8_t obsolete:1; // if set to 1, the listener will remove this client
+ // after setting this to 1, you should not touch
+ // this web_client
+
+ uint8_t dead:1; // if set to 1, this client is dead
+
+ uint8_t keepalive:1; // if set to 1, the web client will be re-used
+ uint8_t enable_gzip:1; // if set to 1, the response will be compressed
+
+ uint8_t mode:3; // the operational mode of the client
+
+ uint8_t wait_receive:1; // 1 = we are waiting more input data
+ uint8_t wait_send:1; // 1 = we have data to send to the client
+
+ int ifd;
+ int ofd;
+
char client_ip[NI_MAXHOST+1];
char client_port[NI_MAXSERV+1];
char cookie2[COOKIE_MAX+1];
char origin[ORIGIN_MAX+1];
- int mode;
- int keepalive;
- int enable_gzip;
-
struct sockaddr_storage clientaddr;
-
- pthread_t thread; // the thread servicing this client
- int obsolete; // if set to 1, the listener will remove this client
-
- int ifd;
- int ofd;
-
struct response response;
- int wait_receive;
- int wait_send;
+ size_t stats_received_bytes;
+ size_t stats_sent_bytes;
- unsigned long stats_received_bytes;
- unsigned long stats_sent_bytes;
+ pthread_t thread; // the thread servicing this client
struct web_client *prev;
struct web_client *next;
};
+#define WEB_CLIENT_IS_DEAD(w) (w)->dead=1
+
extern struct web_client *web_clients;
extern uid_t web_files_uid(void);
extern struct web_client *web_client_create(int listener);
extern struct web_client *web_client_free(struct web_client *w);
+extern ssize_t web_client_send(struct web_client *w);
+extern ssize_t web_client_receive(struct web_client *w);
+extern void web_client_process(struct web_client *w);
+extern void web_client_reset(struct web_client *w);
extern void *web_client_main(void *ptr);
if(is_ip_anything(ip)) {
name.sin_addr.s_addr = htonl(INADDR_ANY);
- info("Listening on any IPs (IPv4).");
+ // info("Listening on any IPs (IPv4).");
}
else {
int ret = inet_pton(AF_INET, ip, (void *)&name.sin_addr.s_addr);
close(sock);
return -1;
}
- info("Listening on IP '%s' (IPv4).", ip);
+ // info("Listening on IP '%s' (IPv4).", ip);
}
if(bind (sock, (struct sockaddr *) &name, sizeof (name)) < 0) {
if(is_ip_anything(ip)) {
name.sin6_addr = in6addr_any;
- info("Listening on all IPs (IPv6 and IPv4)");
+ // info("Listening on all IPs (IPv6 and IPv4)");
}
else {
int ret = inet_pton(AF_INET6, ip, (void *)&name.sin6_addr.s6_addr);
close(sock);
return -1;
}
- info("Listening on IP '%s' (IPv6)", ip);
+ // info("Listening on IP '%s' (IPv6)", ip);
}
name.sin6_scope_id = 0;
}
+int create_listen_socket(void) {
+ listen_backlog = (int) config_get_number("global", "http port listen backlog", LISTEN_BACKLOG);
+
+ listen_port = (int) config_get_number("global", "port", LISTEN_PORT);
+ if(listen_port < 1 || listen_port > 65535) {
+ error("Invalid listen port %d given. Defaulting to %d.", listen_port, LISTEN_PORT);
+ listen_port = LISTEN_PORT;
+ }
+ else debug(D_OPTIONS, "Listen port set to %d.", listen_port);
+
+ int ip = 0;
+ char *ipv = config_get("global", "ip version", "any");
+ if(!strcmp(ipv, "any") || !strcmp(ipv, "both") || !strcmp(ipv, "all")) ip = 0;
+ else if(!strcmp(ipv, "ipv4") || !strcmp(ipv, "IPV4") || !strcmp(ipv, "IPv4") || !strcmp(ipv, "4")) ip = 4;
+ else if(!strcmp(ipv, "ipv6") || !strcmp(ipv, "IPV6") || !strcmp(ipv, "IPv6") || !strcmp(ipv, "6")) ip = 6;
+ else error("Cannot understand ip version '%s'. Assuming 'any'.", ipv);
+
+ if(ip == 0 || ip == 6) listen_fd = create_listen_socket6(config_get("global", "bind socket to IP", "*"), listen_port, listen_backlog);
+ if(listen_fd < 0) {
+ listen_fd = create_listen_socket4(config_get("global", "bind socket to IP", "*"), listen_port, listen_backlog);
+ // if(listen_fd >= 0 && ip != 4) info("Managed to open an IPv4 socket on port %d.", listen_port);
+ }
+
+ return listen_fd;
+}
+
// --------------------------------------------------------------------------------------
// the main socket listener
// 3. spawns a new pthread to serve the client (this is optimal for keep-alive clients)
// 4. cleans up old web_clients that their pthreads have been exited
-void *socket_listen_main(void *ptr)
-{
+void *socket_listen_main_multi_threaded(void *ptr) {
if(ptr) { ; }
- info("WEB SERVER thread created with task id %d", gettid());
+ info("Multi-threaded WEB SERVER thread created with task id %d", gettid());
struct web_client *w;
struct timeval tv;
- int retval;
+ int retval, failures = 0;
if(ptr) { ; }
if(listen_fd < 0) fatal("LISTENER: Listen socket is not ready.");
- fd_set ifds, ofds, efds;
- int fdmax = listen_fd;
-
+ fd_set ifds;
FD_ZERO (&ifds);
- FD_ZERO (&ofds);
- FD_ZERO (&efds);
for(;;) {
tv.tv_sec = 0;
tv.tv_usec = 200000;
- if(listen_fd >= 0) {
+ if(likely(listen_fd >= 0))
FD_SET(listen_fd, &ifds);
- FD_SET(listen_fd, &efds);
- }
// debug(D_WEB_CLIENT, "LISTENER: Waiting...");
- retval = select(fdmax+1, &ifds, &ofds, &efds, &tv);
+ retval = select(listen_fd + 1, &ifds, NULL, NULL, &tv);
- if(retval == -1) {
+ if(unlikely(retval == -1)) {
error("LISTENER: select() failed.");
+ failures++;
+
+ if(failures > 10) {
+ error("LISTENER: our listen port %d seems dead. Re-opening it.", listen_fd);
+ close(listen_fd);
+ listen_fd = -1;
+ sleep(5);
+ create_listen_socket();
+ if(listen_fd < 0)
+ fatal("Cannot listen for web clients (connected clients %llu).", global_statistics.connected_clients);
+
+ failures = 0;
+ }
+
continue;
}
- else if(retval) {
+ else if(likely(retval)) {
// check for new incoming connections
- if(FD_ISSET(listen_fd, &ifds)) {
+ if(likely(FD_ISSET(listen_fd, &ifds))) {
w = web_client_create(listen_fd);
if(unlikely(!w)) {
// no need for error log - web_client_create already logged the error
// debug(D_WEB_CLIENT, "LISTENER: select() timeout.");
//}
+ failures = 0;
+
// cleanup unused clients
- for(w = web_clients; w ; w = w?w->next:NULL) {
- if(w->obsolete) {
+ for (w = web_clients; w; ) {
+ if (w->obsolete) {
debug(D_WEB_CLIENT, "%llu: Removing client.", w->id);
- // pthread_join(w->thread, NULL);
+ // pthread_cancel(w->thread);
+ // pthread_join(w->thread, NULL);
w = web_client_free(w);
#ifdef NETDATA_INTERNAL_CHECKS
log_allocations();
#endif
}
+ else w = w->next;
}
}
return NULL;
}
+void *socket_listen_main_single_threaded(void *ptr) {
+ if(ptr) { ; }
+
+ info("Single threaded WEB SERVER thread created with task id %d", gettid());
+
+ struct web_client *w;
+ int retval, failures = 0;
+
+ if(ptr) { ; }
+
+ if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
+ error("Cannot set pthread cancel type to DEFERRED.");
+
+ if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
+ error("Cannot set pthread cancel state to ENABLE.");
+
+ web_client_timeout = (int) config_get_number("global", "disconnect idle web clients after seconds", DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS);
+ web_enable_gzip = config_get_boolean("global", "enable web responses gzip compression", web_enable_gzip);
+
+ if(listen_fd < 0) fatal("LISTENER: Listen socket is not ready.");
+
+ fd_set ifds, ofds, efds;
+ int fdmax = listen_fd;
+
+ for(;;) {
+ int has_obsolete = 0;
+ FD_ZERO (&ifds);
+ FD_ZERO (&ofds);
+ FD_ZERO (&efds);
+
+ if(listen_fd >= 0) {
+ // debug(D_WEB_CLIENT_ACCESS, "LISTENER: adding listen socket %d to ifds, efds", listen_fd);
+ FD_SET(listen_fd, &ifds);
+ FD_SET(listen_fd, &efds);
+ }
+
+ for(w = web_clients; w ; w = w->next) {
+ if(unlikely(w->dead)) {
+ error("%llu: client is dead.");
+ w->obsolete = 1;
+ }
+ else if(unlikely(!w->wait_receive && !w->wait_send)) {
+ error("%llu: client is not set for neither receiving nor sending data.");
+ w->obsolete = 1;
+ }
+
+ if(unlikely(w->obsolete)) {
+ has_obsolete++;
+ continue;
+ }
+
+ // debug(D_WEB_CLIENT_ACCESS, "%llu: adding input socket %d to efds", w->id, w->ifd);
+ FD_SET(w->ifd, &efds);
+ if(w->ifd > fdmax) fdmax = w->ifd;
+
+ if(w->ifd != w->ofd) {
+ // debug(D_WEB_CLIENT_ACCESS, "%llu: adding output socket %d to efds", w->id, w->ofd);
+ FD_SET(w->ofd, &efds);
+ if(w->ofd > fdmax) fdmax = w->ofd;
+ }
+
+ if (w->wait_receive) {
+ // debug(D_WEB_CLIENT_ACCESS, "%llu: adding input socket %d to ifds", w->id, w->ifd);
+ FD_SET(w->ifd, &ifds);
+ if(w->ifd > fdmax) fdmax = w->ifd;
+ }
+
+ if (w->wait_send) {
+ // debug(D_WEB_CLIENT_ACCESS, "%llu: adding output socket %d to ofds", w->id, w->ofd);
+ FD_SET(w->ofd, &ofds);
+ if(w->ofd > fdmax) fdmax = w->ofd;
+ }
+ }
+
+ // cleanup unused clients
+ if(unlikely(has_obsolete)) {
+ for (w = web_clients; w; ) {
+ if (w->obsolete) {
+ debug(D_WEB_CLIENT, "%llu: Removing client.", w->id);
+ w = web_client_free(w);
+#ifdef NETDATA_INTERNAL_CHECKS
+ log_allocations();
+#endif
+ }
+ else w = w->next;
+ }
+ }
+
+ debug(D_WEB_CLIENT_ACCESS, "LISTENER: Waiting...");
+ struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };
+ errno = 0;
+ retval = select(fdmax+1, &ifds, &ofds, &efds, &tv);
+
+ if(retval == -1) {
+ error("LISTENER: select() failed.");
+
+ if(errno != EAGAIN) {
+ // debug(D_WEB_CLIENT_ACCESS, "LISTENER: select() failed.");
+ error("REMOVING ALL %lu WEB CLIENTS !", global_statistics.connected_clients);
+ while (web_clients) web_client_free(web_clients);
+ }
+
+ failures++;
+ if(failures > 10) {
+ error("LISTENER: our listen port %d seems dead. Re-opening it.", listen_fd);
+ close(listen_fd);
+ listen_fd = -1;
+ sleep(5);
+ create_listen_socket();
+ if(listen_fd < 0)
+ fatal("Cannot listen for web clients (connected clients %llu).", global_statistics.connected_clients);
+
+ failures = 0;
+ }
+
+ continue;
+ }
+ else if(retval) {
+ for(w = web_clients; w ; w = w->next) {
+ if (unlikely(w->obsolete)) continue;
+
+ if (unlikely(FD_ISSET(w->ifd, &efds))) {
+ debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on input socket.", w->id);
+ web_client_reset(w);
+ w->obsolete = 1;
+ continue;
+ }
+
+ if (unlikely(FD_ISSET(w->ofd, &efds))) {
+ debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on output socket.", w->id);
+ web_client_reset(w);
+ w->obsolete = 1;
+ continue;
+ }
+
+ if (unlikely(w->wait_receive && FD_ISSET(w->ifd, &ifds))) {
+ long bytes;
+ if (unlikely((bytes = web_client_receive(w)) < 0)) {
+ debug(D_WEB_CLIENT, "%llu: Cannot receive data from client. Closing client.", w->id);
+ errno = 0;
+ web_client_reset(w);
+ w->obsolete = 1;
+ continue;
+ }
+
+ if (w->mode == WEB_CLIENT_MODE_NORMAL) {
+ debug(D_WEB_CLIENT, "%llu: Processing received data (%ld bytes).", w->id, bytes);
+ // info("%llu: Attempting to process received data (%ld bytes).", w->id, bytes);
+ web_client_process(w);
+ }
+ else {
+ debug(D_WEB_CLIENT, "%llu: NO Processing for received data (%ld bytes).", w->id, bytes);
+ }
+ }
+
+ if (unlikely(w->wait_send && FD_ISSET(w->ofd, &ofds))) {
+ ssize_t bytes;
+ if (unlikely((bytes = web_client_send(w)) < 0)) {
+ debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id);
+ errno = 0;
+ web_client_reset(w);
+ w->obsolete = 1;
+ continue;
+ }
+ }
+ }
+
+ // check for new incoming connections
+ if(FD_ISSET(listen_fd, &ifds)) {
+ debug(D_WEB_CLIENT_ACCESS, "LISTENER: new connection.");
+ web_client_create(listen_fd);
+ }
+ }
+ else {
+ debug(D_WEB_CLIENT_ACCESS, "LISTENER: timeout.");
+ }
+
+ failures = 0;
+ }
+
+ error("LISTENER: exit!");
+
+ if(listen_fd >= 0) close(listen_fd);
+ exit(2);
+
+ return NULL;
+}
extern int create_listen_socket4(const char *ip, int port, int listen_backlog);
extern int create_listen_socket6(const char *ip, int port, int listen_backlog);
-extern void *socket_listen_main(void *ptr);
+extern void *socket_listen_main_multi_threaded(void *ptr);
+extern void *socket_listen_main_single_threaded(void *ptr);
+extern int create_listen_socket(void);
#endif /* NETDATA_WEB_SERVER_H */