From 56983e7505af6aac3b1f1084751157138bd9ee71 Mon Sep 17 00:00:00 2001 From: "Costa Tsaousis (ktsaou)" Date: Sun, 20 Nov 2016 16:07:42 +0200 Subject: [PATCH] added backend timeout and thread resource usage chart --- src/backends.c | 69 ++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 53 insertions(+), 16 deletions(-) diff --git a/src/backends.c b/src/backends.c index b94d7036..75395973 100644 --- a/src/backends.c +++ b/src/backends.c @@ -4,7 +4,7 @@ #define BACKEND_SOURCE_DATA_AVERAGE 0x00000002 #define BACKEND_SOURCE_DATA_SUM 0x00000004 -int connect_to_socket4(const char *ip, int port) { +int connect_to_socket4(const char *ip, int port, struct timeval *timeout) { int sock; debug(D_LISTENER, "IPv4 connecting to ip '%s' port %d", ip, port); @@ -15,6 +15,9 @@ int connect_to_socket4(const char *ip, int port) { return -1; } + if(setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)timeout, sizeof(struct timeval)) < 0) + error("Failed to set timeout on the socket to ip '%s' port %d", ip, port); + struct sockaddr_in name; memset(&name, 0, sizeof(struct sockaddr_in)); name.sin_family = AF_INET; @@ -37,7 +40,7 @@ int connect_to_socket4(const char *ip, int port) { return sock; } -int connect_to_socket6(const char *ip, int port) { +int connect_to_socket6(const char *ip, int port, struct timeval *timeout) { int sock = -1; int ipv6only = 1; @@ -49,6 +52,9 @@ int connect_to_socket6(const char *ip, int port) { return -1; } + if(setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)timeout, sizeof(struct timeval)) < 0) + error("Failed to set timeout on the socket to ip '%s' port %d", ip, port); + /* IPv6 only */ if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&ipv6only, sizeof(ipv6only)) != 0) error("Cannot set IPV6_V6ONLY on ip '%s' port's %d.", ip, port); @@ -78,7 +84,7 @@ int connect_to_socket6(const char *ip, int port) { } -static inline int connect_to_one(const char *definition, int default_port) { +static inline int connect_to_one(const char *definition, int default_port, struct timeval *timeout) { struct addrinfo hints; struct addrinfo *result = NULL, *rp = NULL; @@ -139,7 +145,7 @@ static inline int connect_to_one(const char *definition, int default_port) { struct sockaddr_in *sin = (struct sockaddr_in *) rp->ai_addr; inet_ntop(AF_INET, &sin->sin_addr, rip, INET_ADDRSTRLEN); rport = ntohs(sin->sin_port); - fd = connect_to_socket4(rip, rport); + fd = connect_to_socket4(rip, rport, timeout); break; } @@ -147,7 +153,7 @@ static inline int connect_to_one(const char *definition, int default_port) { struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *) rp->ai_addr; inet_ntop(AF_INET6, &sin6->sin6_addr, rip, INET6_ADDRSTRLEN); rport = ntohs(sin6->sin6_port); - fd = connect_to_socket6(rip, rport); + fd = connect_to_socket6(rip, rport, timeout); break; } } @@ -265,9 +271,13 @@ void *backends_main(void *ptr) { // ------------------------------------------------------------------------ // collect configuration options + struct timeval timeout = { + .tv_sec = 0, + .tv_usec = 0 + }; int default_port = 0; int sock = -1; - uint32_t options = BACKEND_SOURCE_DATA_AVERAGE; + uint32_t options; int enabled = config_get_boolean("backend", "enable", 0); const char *source = config_get("backend", "data source", "average"); const char *type = config_get("backend", "type", "graphite"); @@ -276,6 +286,7 @@ void *backends_main(void *ptr) { const char *hostname = config_get("backend", "hostname", localhost.hostname); int frequency = (int)config_get_number("backend", "update every", 10); int buffer_on_failures = (int)config_get_number("backend", "buffer on failures", 10); + long timeoutms = config_get_number("backend", "timeout ms", frequency * 2 * 1000); // ------------------------------------------------------------------------ // validate configuration options @@ -316,9 +327,18 @@ void *backends_main(void *ptr) { goto cleanup; } + if(timeoutms < 1) { + error("BACKED invalid timeout %ld ms given. Assuming %d ms.", timeoutms, frequency * 2 * 1000); + timeoutms = frequency * 2 * 1000; + } + timeout.tv_sec = (timeoutms * 1000) / 1000000; + timeout.tv_usec = (timeoutms * 1000) % 1000000; + // ------------------------------------------------------------------------ // prepare the charts for monitoring the backend + struct rusage thread; + collected_number chart_buffered_metrics = 0, chart_lost_metrics = 0, @@ -363,6 +383,13 @@ void *backends_main(void *ptr) { rrddim_add(chart_latency, "latency", NULL, 1, 1000, RRDDIM_ABSOLUTE); } + RRDSET *chart_rusage = rrdset_find("netdata.backend_thread_cpu"); + if(!chart_rusage) { + chart_rusage = rrdset_create("netdata", "backend_thread_cpu", NULL, "backend", NULL, "NetData Backend Thread CPU usage", "milliseconds/s", 130630, frequency, RRDSET_TYPE_STACKED); + rrddim_add(chart_rusage, "user", NULL, 1, 1000, RRDDIM_INCREMENTAL); + rrddim_add(chart_rusage, "system", NULL, 1, 1000, RRDDIM_INCREMENTAL); + } + // ------------------------------------------------------------------------ // prepare the backend main loop @@ -455,7 +482,7 @@ void *backends_main(void *ptr) { char buf[e - s + 1]; strncpyz(buf, s, e - s); chart_backend_reconnects++; - sock = connect_to_one(buf, default_port); + sock = connect_to_one(buf, default_port, &timeout); if(sock != -1) break; s = e; } @@ -470,7 +497,11 @@ void *backends_main(void *ptr) { if(likely(sock != -1)) { size_t len = buffer_strlen(b); unsigned long long start_ut = time_usec(); - ssize_t written = write(sock, buffer_tostring(b), len); + int flags = 0; +#ifdef MSG_NOSIGNAL + flags += MSG_NOSIGNAL; +#endif + ssize_t written = send(sock, buffer_tostring(b), len, flags); chart_backend_latency += time_usec() - start_ut; if(written != -1 && (size_t)written == len) { // we sent the data successfully @@ -530,10 +561,10 @@ void *backends_main(void *ptr) { // update the monitoring charts if(chart_ops->counter_done) rrdset_next(chart_ops); - rrddim_set(chart_ops, "write", chart_transmission_successes); - rrddim_set(chart_ops, "discard", chart_data_lost_events); - rrddim_set(chart_ops, "failure", chart_transmission_failures); - rrddim_set(chart_ops, "reconnect", chart_backend_reconnects); + rrddim_set(chart_ops, "write", chart_transmission_successes); + rrddim_set(chart_ops, "discard", chart_data_lost_events); + rrddim_set(chart_ops, "failure", chart_transmission_failures); + rrddim_set(chart_ops, "reconnect", chart_backend_reconnects); rrdset_done(chart_ops); if(chart_metrics->counter_done) rrdset_next(chart_metrics); @@ -543,15 +574,21 @@ void *backends_main(void *ptr) { rrdset_done(chart_metrics); if(chart_bytes->counter_done) rrdset_next(chart_bytes); - rrddim_set(chart_bytes, "buffered", chart_buffered_bytes); - rrddim_set(chart_bytes, "lost", chart_lost_bytes); - rrddim_set(chart_bytes, "sent", chart_sent_bytes); + rrddim_set(chart_bytes, "buffered", chart_buffered_bytes); + rrddim_set(chart_bytes, "lost", chart_lost_bytes); + rrddim_set(chart_bytes, "sent", chart_sent_bytes); rrdset_done(chart_bytes); if(chart_latency->counter_done) rrdset_next(chart_latency); - rrddim_set(chart_latency, "latency", chart_backend_latency); + rrddim_set(chart_latency, "latency", chart_backend_latency); rrdset_done(chart_latency); + getrusage(RUSAGE_THREAD, &thread); + if(chart_rusage->counter_done) rrdset_next(chart_rusage); + rrddim_set(chart_rusage, "user", thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec); + rrddim_set(chart_rusage, "system", thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec); + rrdset_done(chart_rusage); + if(likely(buffer_strlen(b) == 0)) chart_buffered_metrics = 0; -- 2.39.2