]> arthur.barton.de Git - netdata.git/commitdiff
dequeue backends responses from the socket; #1432
authorCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Sat, 24 Dec 2016 18:04:43 +0000 (20:04 +0200)
committerCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Sat, 24 Dec 2016 18:04:43 +0000 (20:04 +0200)
src/backends.c
src/socket.c

index 64f25524c4fc9b3f468737222a99e4817a866985..966476dbed7e225f77ec46d6753c242486e71d85 100644 (file)
@@ -94,11 +94,24 @@ static inline int format_dimension_stored_opentsdb_telnet(BUFFER *b, const char
     return 0;
 }
 
+static inline int process_graphite_response(BUFFER *b) {
+    info("Received %zu bytes from graphite backend. Ignoring them.", buffer_strlen(b));
+    buffer_flush(b);
+    return 0;
+}
+
+static inline int process_opentsdb_response(BUFFER *b) {
+    info("Received %zu bytes from opentsdb backend. Ignoring them.", buffer_strlen(b));
+    buffer_flush(b);
+    return 0;
+}
+
 void *backends_main(void *ptr) {
     (void)ptr;
 
-    BUFFER *b = buffer_create(1);
-    int (*formatter)(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options);
+    BUFFER *b = buffer_create(1), *response = buffer_create(1);
+    int (*backend_request_formatter)(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) = NULL;
+    int (*backend_response_checker)(BUFFER *b) = NULL;
 
     info("BACKEND thread created with task id %d", gettid());
 
@@ -151,22 +164,31 @@ void *backends_main(void *ptr) {
     if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
         default_port = 2003;
         if(options == BACKEND_SOURCE_DATA_AS_COLLECTED)
-            formatter = format_dimension_collected_graphite_plaintext;
+            backend_request_formatter = format_dimension_collected_graphite_plaintext;
         else
-            formatter = format_dimension_stored_graphite_plaintext;
+            backend_request_formatter = format_dimension_stored_graphite_plaintext;
+
+        backend_response_checker = process_graphite_response;
     }
     else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
         default_port = 4242;
         if(options == BACKEND_SOURCE_DATA_AS_COLLECTED)
-            formatter = format_dimension_collected_opentsdb_telnet;
+            backend_request_formatter = format_dimension_collected_opentsdb_telnet;
         else
-            formatter = format_dimension_stored_opentsdb_telnet;
+            backend_request_formatter = format_dimension_stored_opentsdb_telnet;
+
+        backend_response_checker = process_opentsdb_response;
     }
     else {
         error("Unknown backend type '%s'", type);
         goto cleanup;
     }
 
+    if(backend_request_formatter == NULL || backend_response_checker == NULL) {
+        error("backend is misconfigured - disabling it.");
+        goto cleanup;
+    }
+
     if(timeoutms < 1) {
         error("BACKED invalid timeout %ld ms given. Assuming %d ms.", timeoutms, frequency * 2 * 1000);
         timeoutms = frequency * 2 * 1000;
@@ -273,7 +295,7 @@ void *backends_main(void *ptr) {
             RRDDIM *rd;
             for(rd = st->dimensions; rd ;rd = rd->next) {
                 if(rd->last_collected_time.tv_sec >= after)
-                    chart_buffered_metrics += formatter(b, prefix, &localhost, hostname, st, rd, after, before, options);
+                    chart_buffered_metrics += backend_request_formatter(b, prefix, &localhost, hostname, st, rd, after, before, options);
             }
 
             pthread_rwlock_unlock(&st->rwlock);
@@ -283,6 +305,8 @@ void *backends_main(void *ptr) {
         if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
             error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
 
+        // ------------------------------------------------------------------------
+
         chart_buffered_bytes = (collected_number)buffer_strlen(b);
 
         // reset the monitoring chart counters
@@ -302,7 +326,58 @@ void *backends_main(void *ptr) {
         //fprintf(stderr, "after = %lu, before = %lu\n", after, before);
 
         // ------------------------------------------------------------------------
-        // connect to a backend server
+        // if we are connected, receive a response, without blocking
+
+        if(likely(sock != -1)) {
+            if(likely(fcntl(sock, F_SETFL, O_NONBLOCK) >= 0)) {
+                errno = 0;
+
+                // loop through to collect all data
+                while(sock != -1 && errno != EWOULDBLOCK) {
+                    buffer_need_bytes(response, 4096);
+
+                    ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len
+                                     , O_NONBLOCK);
+                    if(likely(r > 0)) {
+                        // we received some data
+                        response->len += r;
+                    }
+                    else if(r == 0) {
+                        error("Backend '%s' closed the socket", destination);
+                        close(sock);
+                        sock = -1;
+                    }
+                    else {
+                        // failed to receive data
+                        if(errno != EAGAIN && errno != EWOULDBLOCK)
+                            error("Cannot receive data from backend '%s'.", destination);
+                    }
+                }
+
+                if(sock != -1) {
+                    // unset O_NONBLOCK
+                    int val = fcntl(sock, F_GETFL, 0);
+                    if(likely(val >= 0)) {
+                        int flags = O_NONBLOCK;
+                        val &= ~flags;
+                        if(unlikely(fcntl(sock, F_SETFL, val) < 0))
+                            error("Cannot unset O_NONBLOCK from backend's '%s' socket.", destination);
+                    }
+                    else
+                        error("Cannot get active flags from backend's '%s' socket.", destination);
+                }
+
+                // if we received data, process them
+                if(buffer_strlen(response))
+                    backend_response_checker(response);
+            }
+            else {
+                error("Cannot set O_NONBLOCK from backend's '%s' socket. Receiving data from backend skipped.", destination);
+            }
+        }
+
+        // ------------------------------------------------------------------------
+        // if we are not connected, connect to a backend server
 
         if(unlikely(sock == -1)) {
             usec_t start_ut = now_realtime_usec();
@@ -332,7 +407,7 @@ void *backends_main(void *ptr) {
         if(unlikely(netdata_exit)) break;
 
         // ------------------------------------------------------------------------
-        // send our buffer to the backend server
+        // if we are connected, send our buffer to the backend server
 
         if(likely(sock != -1)) {
             size_t len = buffer_strlen(b);
@@ -341,6 +416,7 @@ void *backends_main(void *ptr) {
 #ifdef MSG_NOSIGNAL
             flags += MSG_NOSIGNAL;
 #endif
+
             ssize_t written = send(sock, buffer_tostring(b), len, flags);
             chart_backend_latency += now_realtime_usec() - start_ut;
             if(written != -1 && (size_t)written == len) {
@@ -439,6 +515,9 @@ cleanup:
     if(sock != -1)
         close(sock);
 
+    buffer_free(b);
+    buffer_free(response);
+
     info("BACKEND thread exiting");
 
     pthread_exit(NULL);
index 681e6c71f245e7f3a9f142e09e110069d2c89f2b..448ec75912dfa1a42e26ec163c265f3ee6b96731 100644 (file)
@@ -85,7 +85,7 @@ int connect_to(const char *definition, int default_port, struct timeval *timeout
 
     int ai_err = getaddrinfo(host, service, &hints, &ai_head);
     if (ai_err != 0) {
-        error("Cannot resolve host '%s', port '%s': %s\n", host, service, gai_strerror(ai_err));
+        error("Cannot resolve host '%s', port '%s': %s", host, service, gai_strerror(ai_err));
         return -1;
     }