From d6d37b5b2f80bb0026a88d98b2500e13df9f171e Mon Sep 17 00:00:00 2001 From: "Costa Tsaousis (ktsaou)" Date: Sat, 24 Dec 2016 20:04:43 +0200 Subject: [PATCH] dequeue backends responses from the socket; #1432 --- src/backends.c | 97 +++++++++++++++++++++++++++++++++++++++++++++----- src/socket.c | 2 +- 2 files changed, 89 insertions(+), 10 deletions(-) diff --git a/src/backends.c b/src/backends.c index 64f25524..966476db 100644 --- a/src/backends.c +++ b/src/backends.c @@ -94,11 +94,24 @@ static inline int format_dimension_stored_opentsdb_telnet(BUFFER *b, const char return 0; } +static inline int process_graphite_response(BUFFER *b) { + info("Received %zu bytes from graphite backend. Ignoring them.", buffer_strlen(b)); + buffer_flush(b); + return 0; +} + +static inline int process_opentsdb_response(BUFFER *b) { + info("Received %zu bytes from opentsdb backend. Ignoring them.", buffer_strlen(b)); + buffer_flush(b); + return 0; +} + void *backends_main(void *ptr) { (void)ptr; - BUFFER *b = buffer_create(1); - int (*formatter)(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options); + BUFFER *b = buffer_create(1), *response = buffer_create(1); + int (*backend_request_formatter)(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) = NULL; + int (*backend_response_checker)(BUFFER *b) = NULL; info("BACKEND thread created with task id %d", gettid()); @@ -151,22 +164,31 @@ void *backends_main(void *ptr) { if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) { default_port = 2003; if(options == BACKEND_SOURCE_DATA_AS_COLLECTED) - formatter = format_dimension_collected_graphite_plaintext; + backend_request_formatter = format_dimension_collected_graphite_plaintext; else - formatter = format_dimension_stored_graphite_plaintext; + backend_request_formatter = format_dimension_stored_graphite_plaintext; + + backend_response_checker = process_graphite_response; } else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) { default_port = 4242; if(options == BACKEND_SOURCE_DATA_AS_COLLECTED) - formatter = format_dimension_collected_opentsdb_telnet; + backend_request_formatter = format_dimension_collected_opentsdb_telnet; else - formatter = format_dimension_stored_opentsdb_telnet; + backend_request_formatter = format_dimension_stored_opentsdb_telnet; + + backend_response_checker = process_opentsdb_response; } else { error("Unknown backend type '%s'", type); goto cleanup; } + if(backend_request_formatter == NULL || backend_response_checker == NULL) { + error("backend is misconfigured - disabling it."); + goto cleanup; + } + if(timeoutms < 1) { error("BACKED invalid timeout %ld ms given. Assuming %d ms.", timeoutms, frequency * 2 * 1000); timeoutms = frequency * 2 * 1000; @@ -273,7 +295,7 @@ void *backends_main(void *ptr) { RRDDIM *rd; for(rd = st->dimensions; rd ;rd = rd->next) { if(rd->last_collected_time.tv_sec >= after) - chart_buffered_metrics += formatter(b, prefix, &localhost, hostname, st, rd, after, before, options); + chart_buffered_metrics += backend_request_formatter(b, prefix, &localhost, hostname, st, rd, after, before, options); } pthread_rwlock_unlock(&st->rwlock); @@ -283,6 +305,8 @@ void *backends_main(void *ptr) { if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0)) error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate); + // ------------------------------------------------------------------------ + chart_buffered_bytes = (collected_number)buffer_strlen(b); // reset the monitoring chart counters @@ -302,7 +326,58 @@ void *backends_main(void *ptr) { //fprintf(stderr, "after = %lu, before = %lu\n", after, before); // ------------------------------------------------------------------------ - // connect to a backend server + // if we are connected, receive a response, without blocking + + if(likely(sock != -1)) { + if(likely(fcntl(sock, F_SETFL, O_NONBLOCK) >= 0)) { + errno = 0; + + // loop through to collect all data + while(sock != -1 && errno != EWOULDBLOCK) { + buffer_need_bytes(response, 4096); + + ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len + , O_NONBLOCK); + if(likely(r > 0)) { + // we received some data + response->len += r; + } + else if(r == 0) { + error("Backend '%s' closed the socket", destination); + close(sock); + sock = -1; + } + else { + // failed to receive data + if(errno != EAGAIN && errno != EWOULDBLOCK) + error("Cannot receive data from backend '%s'.", destination); + } + } + + if(sock != -1) { + // unset O_NONBLOCK + int val = fcntl(sock, F_GETFL, 0); + if(likely(val >= 0)) { + int flags = O_NONBLOCK; + val &= ~flags; + if(unlikely(fcntl(sock, F_SETFL, val) < 0)) + error("Cannot unset O_NONBLOCK from backend's '%s' socket.", destination); + } + else + error("Cannot get active flags from backend's '%s' socket.", destination); + } + + // if we received data, process them + if(buffer_strlen(response)) + backend_response_checker(response); + } + else { + error("Cannot set O_NONBLOCK from backend's '%s' socket. Receiving data from backend skipped.", destination); + } + } + + // ------------------------------------------------------------------------ + // if we are not connected, connect to a backend server if(unlikely(sock == -1)) { usec_t start_ut = now_realtime_usec(); @@ -332,7 +407,7 @@ void *backends_main(void *ptr) { if(unlikely(netdata_exit)) break; // ------------------------------------------------------------------------ - // send our buffer to the backend server + // if we are connected, send our buffer to the backend server if(likely(sock != -1)) { size_t len = buffer_strlen(b); @@ -341,6 +416,7 @@ void *backends_main(void *ptr) { #ifdef MSG_NOSIGNAL flags += MSG_NOSIGNAL; #endif + ssize_t written = send(sock, buffer_tostring(b), len, flags); chart_backend_latency += now_realtime_usec() - start_ut; if(written != -1 && (size_t)written == len) { @@ -439,6 +515,9 @@ cleanup: if(sock != -1) close(sock); + buffer_free(b); + buffer_free(response); + info("BACKEND thread exiting"); pthread_exit(NULL); diff --git a/src/socket.c b/src/socket.c index 681e6c71..448ec759 100644 --- a/src/socket.c +++ b/src/socket.c @@ -85,7 +85,7 @@ int connect_to(const char *definition, int default_port, struct timeval *timeout int ai_err = getaddrinfo(host, service, &hints, &ai_head); if (ai_err != 0) { - error("Cannot resolve host '%s', port '%s': %s\n", host, service, gai_strerror(ai_err)); + error("Cannot resolve host '%s', port '%s': %s", host, service, gai_strerror(ai_err)); return -1; } -- 2.39.2