if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
error("Cannot set pthread cancel state to ENABLE.");
- web_client_timeout = (int) config_get_number("global", "disconnect idle web clients after seconds", DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS);
- web_enable_gzip = config_get_boolean("global", "enable web responses gzip compression", web_enable_gzip);
-
- if(listen_fd < 0) fatal("LISTENER: Listen socket is not ready.");
+ if(listen_fd < 0 || listen_fd >= FD_SETSIZE) fatal("LISTENER: Listen socket %d is not ready, or invalid.", listen_fd);
fd_set ifds;
FD_ZERO (&ifds);
}
error("LISTENER: exit!");
+ close(listen_fd);
+ listen_fd = -1;
+ return NULL;
+}
- if(listen_fd >= 0) close(listen_fd);
- exit(2);
+struct web_client *single_threaded_clients[FD_SETSIZE];
- return NULL;
+static inline int single_threaded_link_client(struct web_client *w, fd_set *ifds, fd_set *ofds, fd_set *efds, int *max) {
+ if(unlikely(w->obsolete || w->dead || (!w->wait_receive && !w->wait_send)))
+ return 1;
+
+ if(unlikely(w->ifd < 0 || w->ifd >= FD_SETSIZE || w->ofd < 0 || w->ofd >= FD_SETSIZE)) {
+ error("%llu: invalid file descriptor, ifd = %d, ofd = %d (required 0 <= fd < FD_SETSIZE (%d)", w->id, w->ifd, w->ofd, FD_SETSIZE);
+ return 1;
+ }
+
+ FD_SET(w->ifd, efds);
+ if(unlikely(*max < w->ifd)) *max = w->ifd;
+
+ if(unlikely(w->ifd != w->ofd)) {
+ if(*max < w->ofd) *max = w->ofd;
+ FD_SET(w->ofd, efds);
+ }
+
+ if(w->wait_receive) FD_SET(w->ifd, ifds);
+ if(w->wait_send) FD_SET(w->ofd, ofds);
+
+ single_threaded_clients[w->ifd] = w;
+ single_threaded_clients[w->ofd] = w;
+
+ return 0;
+}
+
+static inline int single_threaded_unlink_client(struct web_client *w, fd_set *ifds, fd_set *ofds, fd_set *efds) {
+ FD_CLR(w->ifd, efds);
+ if(unlikely(w->ifd != w->ofd)) FD_CLR(w->ofd, efds);
+
+ if(w->wait_receive) FD_CLR(w->ifd, ifds);
+ if(w->wait_send) FD_CLR(w->ofd, ofds);
+
+ single_threaded_clients[w->ifd] = NULL;
+ single_threaded_clients[w->ofd] = NULL;
+
+ if(unlikely(w->obsolete || w->dead || (!w->wait_receive && !w->wait_send)))
+ return 1;
+
+ return 0;
}
void *socket_listen_main_single_threaded(void *ptr) {
- if(ptr) { ; }
+ (void)ptr;
web_server_mode = WEB_SERVER_MODE_SINGLE_THREADED;
if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
error("Cannot set pthread cancel state to ENABLE.");
- web_client_timeout = (int) config_get_number("global", "disconnect idle web clients after seconds", DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS);
- web_enable_gzip = config_get_boolean("global", "enable web responses gzip compression", web_enable_gzip);
+ if(listen_fd < 0 || listen_fd >= FD_SETSIZE)
+ fatal("LISTENER: Listen socket %d is not ready, or invalid.", listen_fd);
- if(listen_fd < 0) fatal("LISTENER: Listen socket is not ready.");
+ int i;
+ for(i = 0; i < FD_SETSIZE ; i++)
+ single_threaded_clients[i] = NULL;
- fd_set ifds, ofds, efds;
+ fd_set ifds, ofds, efds, rifds, rofds, refds;
+ FD_ZERO (&ifds);
+ FD_ZERO (&ofds);
+ FD_ZERO (&efds);
+ FD_SET(listen_fd, &ifds);
+ FD_SET(listen_fd, &efds);
int fdmax = listen_fd;
for(;;) {
- int has_obsolete = 0;
- FD_ZERO (&ifds);
- FD_ZERO (&ofds);
- FD_ZERO (&efds);
-
- if(listen_fd >= 0) {
- // debug(D_WEB_CLIENT_ACCESS, "LISTENER: adding listen socket %d to ifds, efds", listen_fd);
- FD_SET(listen_fd, &ifds);
- FD_SET(listen_fd, &efds);
- }
-
- for(w = web_clients; w ; w = w->next) {
- if(unlikely(w->dead)) {
- error("%llu: client is dead.", w->id);
- w->obsolete = 1;
- }
- else if(unlikely(!w->wait_receive && !w->wait_send)) {
- error("%llu: client is not set for neither receiving nor sending data.");
- w->obsolete = 1;
- }
-
- if(w->ifd < 0 || w->ifd >= FD_SETSIZE || w->ofd < 0 || w->ofd >= FD_SETSIZE) {
- error("%llu: invalid file descriptor, ifd = %d, ofd = %d (required 0 <= fd < FD_SETSIZE (%d)", w->id, w->ifd, w->ofd, FD_SETSIZE);
- w->obsolete = 1;
- }
-
- if(unlikely(w->obsolete)) {
- has_obsolete++;
- continue;
- }
-
- // debug(D_WEB_CLIENT_ACCESS, "%llu: adding input socket %d to efds", w->id, w->ifd);
- FD_SET(w->ifd, &efds);
- if(w->ifd > fdmax) fdmax = w->ifd;
-
- if(w->ifd != w->ofd) {
- // debug(D_WEB_CLIENT_ACCESS, "%llu: adding output socket %d to efds", w->id, w->ofd);
- FD_SET(w->ofd, &efds);
- if(w->ofd > fdmax) fdmax = w->ofd;
- }
-
- if (w->wait_receive) {
- // debug(D_WEB_CLIENT_ACCESS, "%llu: adding input socket %d to ifds", w->id, w->ifd);
- FD_SET(w->ifd, &ifds);
- if(w->ifd > fdmax) fdmax = w->ifd;
- }
-
- if (w->wait_send) {
- // debug(D_WEB_CLIENT_ACCESS, "%llu: adding output socket %d to ofds", w->id, w->ofd);
- FD_SET(w->ofd, &ofds);
- if(w->ofd > fdmax) fdmax = w->ofd;
- }
- }
-
- // cleanup unused clients
- if(unlikely(has_obsolete)) {
- for (w = web_clients; w; ) {
- if (w->obsolete) {
- debug(D_WEB_CLIENT, "%llu: Removing client.", w->id);
- w = web_client_free(w);
-#ifdef NETDATA_INTERNAL_CHECKS
- log_allocations();
-#endif
- }
- else w = w->next;
- }
- }
+ debug(D_WEB_CLIENT_ACCESS, "LISTENER: single threaded web server waiting (listen fd = %d, fdmax = %d)...", listen_fd, fdmax);
- debug(D_WEB_CLIENT_ACCESS, "LISTENER: Waiting...");
struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };
- errno = 0;
- retval = select(fdmax+1, &ifds, &ofds, &efds, &tv);
+ rifds = ifds;
+ rofds = ofds;
+ refds = efds;
+ retval = select(fdmax+1, &rifds, &rofds, &refds, &tv);
- if(retval == -1) {
+ if(unlikely(retval == -1)) {
error("LISTENER: select() failed.");
-
- if(errno != EAGAIN) {
- // debug(D_WEB_CLIENT_ACCESS, "LISTENER: select() failed.");
- error("REMOVING ALL %lu WEB CLIENTS !", global_statistics.connected_clients);
- while (web_clients) web_client_free(web_clients);
- }
-
failures++;
if(failures > 10) {
+ if(global_statistics.connected_clients) {
+ error("REMOVING ALL %lu WEB CLIENTS !", global_statistics.connected_clients);
+ while (web_clients) {
+ single_threaded_unlink_client(web_clients, &ifds, &ofds, &efds);
+ web_client_free(web_clients);
+ }
+ }
+
error("LISTENER: our listen port %d seems dead. Re-opening it.", listen_fd);
+
close(listen_fd);
listen_fd = -1;
sleep(5);
+
create_listen_socket();
- if(listen_fd < 0)
+ if(listen_fd < 0 || listen_fd >= FD_SETSIZE)
fatal("Cannot listen for web clients (connected clients %llu).", global_statistics.connected_clients);
+ FD_ZERO (&ifds);
+ FD_ZERO (&ofds);
+ FD_ZERO (&efds);
+ FD_SET(listen_fd, &ifds);
+ FD_SET(listen_fd, &efds);
failures = 0;
}
-
- continue;
}
- else if(retval) {
- for(w = web_clients; w ; w = w->next) {
- if (unlikely(w->obsolete)) continue;
+ else if(likely(retval)) {
+ failures = 0;
+ debug(D_WEB_CLIENT_ACCESS, "LISTENER: got something.");
- if (unlikely(FD_ISSET(w->ifd, &efds))) {
- debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on input socket.", w->id);
- web_client_reset(w);
- w->obsolete = 1;
+ if(FD_ISSET(listen_fd, &rifds)) {
+ debug(D_WEB_CLIENT_ACCESS, "LISTENER: new connection.");
+ w = web_client_create(listen_fd);
+ if(single_threaded_link_client(w, &ifds, &ofds, &ifds, &fdmax) != 0) {
+ web_client_free(w);
+ }
+ }
+
+ for(i = 0 ; i <= fdmax ; i++) {
+ if(likely(!FD_ISSET(i, &rifds) && !FD_ISSET(i, &rofds) && !FD_ISSET(i, &refds)))
+ continue;
+
+ w = single_threaded_clients[i];
+ if(unlikely(!w))
+ continue;
+
+ if(unlikely(single_threaded_unlink_client(w, &ifds, &ofds, &efds) != 0)) {
+ web_client_free(w);
continue;
}
- if (unlikely(FD_ISSET(w->ofd, &efds))) {
- debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on output socket.", w->id);
- web_client_reset(w);
- w->obsolete = 1;
+ if (unlikely(FD_ISSET(w->ifd, &refds) || FD_ISSET(w->ofd, &refds))) {
+ web_client_free(w);
continue;
}
- if (unlikely(w->wait_receive && FD_ISSET(w->ifd, &ifds))) {
- long bytes;
- if (unlikely((bytes = web_client_receive(w)) < 0)) {
- debug(D_WEB_CLIENT, "%llu: Cannot receive data from client. Closing client.", w->id);
- errno = 0;
- web_client_reset(w);
- w->obsolete = 1;
+ if (unlikely(w->wait_receive && FD_ISSET(w->ifd, &rifds))) {
+ if (unlikely(web_client_receive(w) < 0)) {
+ web_client_free(w);
continue;
}
- if (w->mode == WEB_CLIENT_MODE_NORMAL) {
- debug(D_WEB_CLIENT, "%llu: Processing received data (%ld bytes).", w->id, bytes);
- // info("%llu: Attempting to process received data (%ld bytes).", w->id, bytes);
+ if (w->mode != WEB_CLIENT_MODE_FILECOPY) {
+ debug(D_WEB_CLIENT, "%llu: Processing received data.", w->id);
web_client_process(w);
}
- else {
- debug(D_WEB_CLIENT, "%llu: NO Processing for received data (%ld bytes).", w->id, bytes);
- }
}
- if (unlikely(w->wait_send && FD_ISSET(w->ofd, &ofds))) {
- ssize_t bytes;
- if (unlikely((bytes = web_client_send(w)) < 0)) {
+ if (unlikely(w->wait_send && FD_ISSET(w->ofd, &rofds))) {
+ if (unlikely(web_client_send(w) < 0)) {
debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id);
- errno = 0;
- web_client_reset(w);
- w->obsolete = 1;
+ web_client_free(w);
continue;
}
}
- }
- // check for new incoming connections
- if(FD_ISSET(listen_fd, &ifds)) {
- debug(D_WEB_CLIENT_ACCESS, "LISTENER: new connection.");
- web_client_create(listen_fd);
+ if(unlikely(single_threaded_link_client(w, &ifds, &ofds, &efds, &fdmax) != 0)) {
+ web_client_free(w);
+ }
}
}
else {
- debug(D_WEB_CLIENT_ACCESS, "LISTENER: timeout.");
+ debug(D_WEB_CLIENT_ACCESS, "LISTENER: single threaded web server timeout.");
+#ifdef NETDATA_INTERNAL_CHECKS
+ log_allocations();
+#endif
}
-
- failures = 0;
}
error("LISTENER: exit!");
-
- if(listen_fd >= 0) close(listen_fd);
- exit(2);
-
+ close(listen_fd);
+ listen_fd = -1;
return NULL;
}