]> arthur.barton.de Git - netdata.git/commitdiff
do not try to reconnect too soon
authorCosta Tsaousis <costa@tsaousis.gr>
Tue, 21 Feb 2017 12:28:27 +0000 (14:28 +0200)
committerCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Tue, 21 Feb 2017 23:00:27 +0000 (01:00 +0200)
src/log.c
src/rrdpush.c
src/web_client.c

index 02cbca40e244ce29e0389e50be3451d802bdd51c..01beaf7ec4876e5ece8e343c79768b3c80530ac8 100644 (file)
--- a/src/log.c
+++ b/src/log.c
@@ -257,7 +257,7 @@ void info_int( const char *file, const char *function, const unsigned long line,
     log_date(stderr);
 
     va_start( args, fmt );
-    if(debug_flags) fprintf(stderr, "%s: INFO: (%04lu@%-10.10s:%-15.15s):", program_name, line, file, function);
+    if(debug_flags) fprintf(stderr, "%s: INFO: (%04lu@%-10.10s:%-15.15s): ", program_name, line, file, function);
     else            fprintf(stderr, "%s: INFO: ", program_name);
     vfprintf( stderr, fmt, args );
     va_end( args );
index 9d2ad79a9de9c9e2c4a2d69508b79ce9d5575c39..de2a455763287ed736ee12f206580be19878a887 100644 (file)
@@ -104,40 +104,43 @@ void rrdset_done_push(RRDSET *st) {
     if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
         return;
 
+
+    rrdpush_lock();
+
     if(unlikely(!rrdpush_buffer || !rrdpush_connected)) {
         if(!error_shown)
-            error("PUSH: not ready - discarding collected metrics.");
+            error("STREAM: not ready - discarding collected metrics.");
 
         error_shown = 1;
+
+        rrdpush_unlock();
         return;
     }
     error_shown = 0;
 
-    rrdpush_lock();
-    rrdset_rdlock(st);
-
     if(st->rrdhost != last_host) {
         buffer_sprintf(rrdpush_buffer, "HOST '%s' '%s'\n", st->rrdhost->machine_guid, st->rrdhost->hostname);
         last_host = st->rrdhost;
     }
 
+    rrdset_rdlock(st);
     if(need_to_send_chart_definition(st))
         send_chart_definition(st);
 
     send_chart_metrics(st);
+    rrdset_unlock(st);
 
     // signal the sender there are more data
     if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
-        error("Cannot write to internal pipe");
+        error("STREAM: cannot write to internal pipe");
 
-    rrdset_unlock(st);
     rrdpush_unlock();
 }
 
 static inline void rrdpush_flush(void) {
     rrdpush_lock();
     if(buffer_strlen(rrdpush_buffer))
-        error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
+        error("STREAM: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
 
     buffer_flush(rrdpush_buffer);
     reset_all_charts();
@@ -148,19 +151,19 @@ static inline void rrdpush_flush(void) {
 void *central_netdata_push_thread(void *ptr) {
     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
 
-    info("Central netdata push thread created with task id %d", gettid());
+    info("STREAM: central netdata push thread created with task id %d", gettid());
 
     if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
-        error("Cannot set pthread cancel type to DEFERRED.");
+        error("STREAM: cannot set pthread cancel type to DEFERRED.");
 
     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
-        error("Cannot set pthread cancel state to ENABLE.");
+        error("STREAM: cannot set pthread cancel state to ENABLE.");
 
 
     rrdpush_buffer = buffer_create(1);
 
     if(pipe(rrdpush_pipe) == -1)
-        fatal("Cannot create required pipe.");
+        fatal("STREAM: cannot create required pipe.");
 
     struct timeval tv = {
             .tv_sec = 60,
@@ -176,6 +179,7 @@ void *central_netdata_push_thread(void *ptr) {
     int sock = -1;
 
     struct pollfd fds[2], *ifd, *ofd;
+    nfds_t fdmax;
 
     ifd = &fds[0];
     ofd = &fds[1];
@@ -184,32 +188,37 @@ void *central_netdata_push_thread(void *ptr) {
         if(netdata_exit) break;
 
         if(unlikely(sock == -1)) {
+            // stop appending data into rrdpush_buffer
+            // they will be lost, so there is no point to do it
             rrdpush_connected = 0;
 
-            info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data);
+            info("STREAM: connecting to central netdata at: %s", central_netdata_to_push_data);
             sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
 
             if(unlikely(sock == -1)) {
-                error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+                error("STREAM: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+                sleep(5);
                 continue;
             }
 
-            info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
+            info("STREAM: initializing communication to central netdata at: %s", central_netdata_to_push_data);
 
             char http[1000 + 1];
             snprintfz(http, 1000, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), program_version);
             if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
                 close(sock);
                 sock = -1;
-                error("PUSH: failed to send http header to netdata at: %s", central_netdata_to_push_data);
+                error("STREAM: failed to send http header to netdata at: %s", central_netdata_to_push_data);
                 sleep(5);
                 continue;
             }
 
+            info("STREAM: Waiting for STREAM from central netdata at: %s", central_netdata_to_push_data);
+
             if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
                 close(sock);
                 sock = -1;
-                error("PUSH: failed to receive OK from netdata at: %s", central_netdata_to_push_data);
+                error("STREAM: failed to receive STREAM from netdata at: %s", central_netdata_to_push_data);
                 sleep(5);
                 continue;
             }
@@ -217,16 +226,20 @@ void *central_netdata_push_thread(void *ptr) {
             if(strncmp(http, "STREAM", 6)) {
                 close(sock);
                 sock = -1;
-                error("PUSH: netdata servers at  %s, did not send STREAM", central_netdata_to_push_data);
+                error("STREAM: netdata servers at  %s, did not send STREAM", central_netdata_to_push_data);
                 sleep(5);
                 continue;
             }
 
+            info("STREAM: Established STREAM with central netdata at: %s - sending metrics...", central_netdata_to_push_data);
+
             if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
-                error("PUSH: cannot set non-blocking mode for socket.");
+                error("STREAM: cannot set non-blocking mode for socket.");
 
             rrdpush_flush();
             sent_connection = 0;
+
+            // allow appending data into rrdpush_buffer
             rrdpush_connected = 1;
         }
 
@@ -235,15 +248,15 @@ void *central_netdata_push_thread(void *ptr) {
         ifd->revents = 0;
 
         ofd->fd = sock;
-        ofd->events = POLLOUT;
         ofd->revents = 0;
-
-        nfds_t fdmax = 2;
-
-        if(begin < buffer_strlen(rrdpush_buffer))
+        if(begin < buffer_strlen(rrdpush_buffer)) {
             ofd->events = POLLOUT;
-        else
+            fdmax = 2;
+        }
+        else {
             ofd->events = 0;
+            fdmax = 1;
+        }
 
         if(netdata_exit) break;
         int retval = poll(fds, fdmax, 60 * 1000);
@@ -253,7 +266,7 @@ void *central_netdata_push_thread(void *ptr) {
             if(errno == EAGAIN || errno == EINTR)
                 continue;
 
-            error("PUSH: Failed to poll().");
+            error("STREAM: Failed to poll().");
             close(sock);
             sock = -1;
             break;
@@ -266,11 +279,11 @@ void *central_netdata_push_thread(void *ptr) {
         if(ifd->revents & POLLIN) {
             char buffer[1000 + 1];
             if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
-                error("PUSH: Cannot read from internal pipe.");
+                error("STREAM: Cannot read from internal pipe.");
         }
 
         if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
-            // info("PUSH: send buffer is ready, sending %zu bytes starting at %zu", buffer_strlen(rrdpush_buffer) - begin, begin);
+            // info("STREAM: send buffer is ready, sending %zu bytes starting at %zu", buffer_strlen(rrdpush_buffer) - begin, begin);
 
             // fprintf(stderr, "PUSH BEGIN\n");
             // fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr);
@@ -280,7 +293,7 @@ void *central_netdata_push_thread(void *ptr) {
             ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
             if(ret == -1) {
                 if(errno != EAGAIN && errno != EINTR) {
-                    error("PUSH: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
+                    error("STREAM: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
                     close(sock);
                     sock = -1;
                 }
@@ -300,7 +313,7 @@ void *central_netdata_push_thread(void *ptr) {
         // protection from overflow
         if(rrdpush_buffer->len > max_size) {
             errno = 0;
-            error("PUSH: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
+            error("STREAM: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
             if(sock != -1) {
                 close(sock);
                 sock = -1;
@@ -308,7 +321,7 @@ void *central_netdata_push_thread(void *ptr) {
         }
     }
 
-    debug(D_WEB_CLIENT, "Central netdata push thread exits.");
+    debug(D_WEB_CLIENT, "STREAM: central netdata push thread exits.");
     if(sock != -1) {
         close(sock);
     }
index db578f9c2421244335dec776ca5fca4a7fa27056..784a28f37be6b578e6a390cdac9d739c9817f0bd 100644 (file)
@@ -1689,14 +1689,14 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
     }
 
     if(!key || !*key) {
-        error("STREAM request from client '%s:%s', without an API key. Forbidding access.", w->client_ip, w->client_port);
+        error("STREAM [%s]:%s: request without an API key. Forbidding access.", w->client_ip, w->client_port);
         buffer_flush(w->response.data);
         buffer_sprintf(w->response.data, "You need an API key for this request.");
         return 401;
     }
 
     if(!validate_stream_api_key(key)) {
-        error("STREAM request from client '%s:%s': API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
+        error("STREAM [%s]:%s: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
         buffer_flush(w->response.data);
         buffer_sprintf(w->response.data, "Your API key is not permitted access.");
         return 401;
@@ -1719,16 +1719,17 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
     snprintfz(cd.fullfilename, FILENAME_MAX,     "%s:%s", w->client_ip, w->client_port);
     snprintfz(cd.cmd,          PLUGINSD_CMD_MAX, "%s:%s", w->client_ip, w->client_port);
 
+    info("STREAM [%s]:%s: sending STREAM to initiate streaming...", w->client_ip, w->client_port);
     if(send_timeout(w->ifd, "STREAM", 6, 0, 60) != 6) {
-        error("Cannot send STREAM to netdata at %s:%s", w->client_ip, w->client_port);
+        error("STREAM [%s]:%s: cannot send STREAM.", w->client_ip, w->client_port);
         buffer_flush(w->response.data);
-        buffer_sprintf(w->response.data, "STREAM failed to reply back with STREAM");
+        buffer_sprintf(w->response.data, "Failed to reply back with STREAM");
         return 400;
     }
 
     // remove the non-blocking flag from the socket
     if(fcntl(w->ifd, F_SETFL, fcntl(w->ifd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
-        error("STREAM from '%s:%s': cannot remove the non-blocking flag from socket %d", w->client_ip, w->client_port, w->ifd);
+        error("STREAM [%s]:%s: cannot remove the non-blocking flag from socket %d", w->client_ip, w->client_port, w->ifd);
 
     /*
     char buffer[1000 + 1];
@@ -1742,16 +1743,16 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
     // convert the socket to a FILE *
     FILE *fp = fdopen(w->ifd, "r");
     if(!fp) {
-        error("STREAM from '%s:%s': failed to get a FILE for FD %d.", w->client_ip, w->client_port, w->ifd);
+        error("STREAM [%s]:%s: failed to get a FILE for FD %d.", w->client_ip, w->client_port, w->ifd);
         buffer_flush(w->response.data);
         buffer_sprintf(w->response.data, "Failed to get a FILE for an FD.");
         return 500;
     }
 
     // call the plugins.d processor to receive the metrics
-    info("STREAM connecting client '%s:%s' to plugins.d.", w->client_ip, w->client_port);
+    info("STREAM [%s]:%s: connecting client to plugins.d.", w->client_ip, w->client_port);
     size_t count = pluginsd_process(host, &cd, fp, 1);
-    error("STREAM from '%s:%s': client disconnected.", w->client_ip, w->client_port);
+    error("STREAM [%s]:%s: client disconnected.", w->client_ip, w->client_port);
 
     // close all sockets, to let the socket worker we are done
     fclose(fp);