X-Git-Url: https://arthur.barton.de/gitweb/?a=blobdiff_plain;f=src%2Fbackends.c;h=c5b57dbe7ef291022d2e7d86ae558eaa96fae672;hb=c98acefa301f23d8c7d2857b8cde5fbb9c2326a9;hp=026441250229fad960c9cf76681a0b47f6d7ce58;hpb=6c9f0d2d75ee95bcaa74eed9fd9192eb24de7120;p=netdata.git diff --git a/src/backends.c b/src/backends.c index 02644125..c5b57dbe 100644 --- a/src/backends.c +++ b/src/backends.c @@ -128,11 +128,13 @@ static inline int process_opentsdb_response(BUFFER *b) { } void *backends_main(void *ptr) { + int default_port = 0; + int sock = -1; struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; BUFFER *b = buffer_create(1), *response = buffer_create(1); - int (*backend_request_formatter)(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) = NULL; - int (*backend_response_checker)(BUFFER *b) = NULL; + int (*backend_request_formatter)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, uint32_t) = NULL; + int (*backend_response_checker)(BUFFER *) = NULL; info("BACKEND thread created with task id %d", gettid()); @@ -149,22 +151,21 @@ void *backends_main(void *ptr) { .tv_sec = 0, .tv_usec = 0 }; - int default_port = 0; - int sock = -1; uint32_t options; - int enabled = config_get_boolean("backend", "enabled", 0); - const char *source = config_get("backend", "data source", "average"); - const char *type = config_get("backend", "type", "graphite"); + int enabled = config_get_boolean("backend", "enabled", 0); + const char *source = config_get("backend", "data source", "average"); + const char *type = config_get("backend", "type", "graphite"); const char *destination = config_get("backend", "destination", "localhost"); - const char *prefix = config_get("backend", "prefix", "netdata"); - const char *hostname = config_get("backend", "hostname", localhost->hostname); - int frequency = (int)config_get_number("backend", "update every", 10); - int buffer_on_failures = (int)config_get_number("backend", "buffer on failures", 10); - long timeoutms = config_get_number("backend", "timeout ms", frequency * 2 * 1000); + const char *prefix = config_get("backend", "prefix", "netdata"); + const char *hostname = config_get("backend", "hostname", localhost->hostname); + int frequency = (int)config_get_number("backend", "update every", 10); + int buffer_on_failures = (int)config_get_number("backend", "buffer on failures", 10); + long timeoutms = config_get_number("backend", "timeout ms", frequency * 2 * 1000); // ------------------------------------------------------------------------ // validate configuration options // and prepare for sending data to our backend + if(!enabled || frequency < 1) goto cleanup; @@ -182,6 +183,17 @@ void *backends_main(void *ptr) { goto cleanup; } + if(timeoutms < 1) { + error("BACKED invalid timeout %ld ms given. Assuming %d ms.", timeoutms, frequency * 2 * 1000); + timeoutms = frequency * 2 * 1000; + } + timeout.tv_sec = (timeoutms * 1000) / 1000000; + timeout.tv_usec = (timeoutms * 1000) % 1000000; + + + // ------------------------------------------------------------------------ + // select the backend type + if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) { default_port = 2003; if(options == BACKEND_SOURCE_DATA_AS_COLLECTED) @@ -210,15 +222,9 @@ void *backends_main(void *ptr) { goto cleanup; } - if(timeoutms < 1) { - error("BACKED invalid timeout %ld ms given. Assuming %d ms.", timeoutms, frequency * 2 * 1000); - timeoutms = frequency * 2 * 1000; - } - timeout.tv_sec = (timeoutms * 1000) / 1000000; - timeout.tv_usec = (timeoutms * 1000) % 1000000; // ------------------------------------------------------------------------ - // prepare the charts for monitoring the backend + // prepare the charts for monitoring the backend operation struct rusage thread; @@ -237,37 +243,23 @@ void *backends_main(void *ptr) { chart_backend_reconnects = 0, chart_backend_latency = 0; - RRDSET *chart_metrics = rrdset_find_localhost("netdata.backend_metrics"); - if(!chart_metrics) { - chart_metrics = rrdset_create_localhost("netdata", "backend_metrics", NULL, "backend", NULL - , "Netdata Buffered Metrics", "metrics", 130600, frequency - , RRDSET_TYPE_LINE); - rrddim_add(chart_metrics, "buffered", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - rrddim_add(chart_metrics, "lost", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - rrddim_add(chart_metrics, "sent", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } + RRDSET *chart_metrics = rrdset_create_localhost("netdata", "backend_metrics", NULL, "backend", NULL, "Netdata Buffered Metrics", "metrics", 130600, frequency, RRDSET_TYPE_LINE); + rrddim_add(chart_metrics, "buffered", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rrddim_add(chart_metrics, "lost", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rrddim_add(chart_metrics, "sent", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - RRDSET *chart_bytes = rrdset_find_localhost("netdata.backend_bytes"); - if(!chart_bytes) { - chart_bytes = rrdset_create_localhost("netdata", "backend_bytes", NULL, "backend", NULL - , "Netdata Backend Data Size", "KB", 130610, frequency, RRDSET_TYPE_AREA); - rrddim_add(chart_bytes, "buffered", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE); - rrddim_add(chart_bytes, "lost", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE); - rrddim_add(chart_bytes, "sent", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE); - rrddim_add(chart_bytes, "received", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE); - } + RRDSET *chart_bytes = rrdset_create_localhost("netdata", "backend_bytes", NULL, "backend", NULL, "Netdata Backend Data Size", "KB", 130610, frequency, RRDSET_TYPE_AREA); + rrddim_add(chart_bytes, "buffered", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE); + rrddim_add(chart_bytes, "lost", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE); + rrddim_add(chart_bytes, "sent", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE); + rrddim_add(chart_bytes, "received", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE); - RRDSET *chart_ops = rrdset_find_localhost("netdata.backend_ops"); - if(!chart_ops) { - chart_ops = rrdset_create_localhost("netdata", "backend_ops", NULL, "backend", NULL - , "Netdata Backend Operations", "operations", 130630, frequency - , RRDSET_TYPE_LINE); - rrddim_add(chart_ops, "write", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - rrddim_add(chart_ops, "discard", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - rrddim_add(chart_ops, "reconnect", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - rrddim_add(chart_ops, "failure", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - rrddim_add(chart_ops, "read", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } + RRDSET *chart_ops = rrdset_create_localhost("netdata", "backend_ops", NULL, "backend", NULL, "Netdata Backend Operations", "operations", 130630, frequency, RRDSET_TYPE_LINE); + rrddim_add(chart_ops, "write", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rrddim_add(chart_ops, "discard", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rrddim_add(chart_ops, "reconnect", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rrddim_add(chart_ops, "failure", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rrddim_add(chart_ops, "read", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); /* * this is misleading - we can only measure the time we need to send data @@ -276,21 +268,14 @@ void *backends_main(void *ptr) { * * issue #1432 and https://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html * - RRDSET *chart_latency = rrdset_find_localhost("netdata.backend_latency"); - if(!chart_latency) { - chart_latency = rrdset_create_localhost("netdata", "backend_latency", NULL, "backend", NULL, "Netdata Backend Latency", "ms", 130620, frequency, RRDSET_TYPE_AREA); - rrddim_add(chart_latency, "latency", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); - } + RRDSET *chart_latency = rrdset_create_localhost("netdata", "backend_latency", NULL, "backend", NULL, "Netdata Backend Latency", "ms", 130620, frequency, RRDSET_TYPE_AREA); + rrddim_add(chart_latency, "latency", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); */ - RRDSET *chart_rusage = rrdset_find_localhost("netdata.backend_thread_cpu"); - if(!chart_rusage) { - chart_rusage = rrdset_create_localhost("netdata", "backend_thread_cpu", NULL, "backend", NULL - , "NetData Backend Thread CPU usage", "milliseconds/s", 130630, frequency - , RRDSET_TYPE_STACKED); - rrddim_add(chart_rusage, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - rrddim_add(chart_rusage, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - } + RRDSET *chart_rusage = rrdset_create_localhost("netdata", "backend_thread_cpu", NULL, "backend", NULL, "NetData Backend Thread CPU usage", "milliseconds/s", 130630, frequency, RRDSET_TYPE_STACKED); + rrddim_add(chart_rusage, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + rrddim_add(chart_rusage, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + // ------------------------------------------------------------------------ // prepare the backend main loop @@ -304,32 +289,43 @@ void *backends_main(void *ptr) { heartbeat_init(&hb); for(;;) { + // ------------------------------------------------------------------------ // Wait for the next iteration point. heartbeat_next(&hb, step_ut); time_t before = now_realtime_sec(); + // ------------------------------------------------------------------------ // add to the buffer the data we need to send to the backend - RRDSET *st; + int pthreadoldcancelstate; if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &pthreadoldcancelstate) != 0)) error("Cannot set pthread cancel state to DISABLE."); - rrdhost_rdlock(localhost); - for(st = localhost->rrdset_root; st ;st = st->next) { - pthread_rwlock_rdlock(&st->rwlock); + rrd_rdlock(); + RRDHOST *host; + rrdhost_foreach_read(host) { + if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE) + continue; - RRDDIM *rd; - for(rd = st->dimensions; rd ;rd = rd->next) { - if(rd->last_collected_time.tv_sec >= after) - chart_buffered_metrics += backend_request_formatter(b, prefix, localhost, hostname, st, rd, after, before, options); - } + rrdhost_rdlock(host); - pthread_rwlock_unlock(&st->rwlock); + RRDSET *st; + rrdset_foreach_read(st, host) { + rrdset_rdlock(st); + + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if(rd->last_collected_time.tv_sec >= after) + chart_buffered_metrics += backend_request_formatter(b, prefix, host, (host == localhost)?hostname:host->hostname, st, rd, after, before, options); + } + rrdset_unlock(st); + } + rrdhost_unlock(host); } - rrdhost_unlock(localhost); + rrd_unlock(); if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0)) error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate); @@ -399,26 +395,11 @@ void *backends_main(void *ptr) { if(unlikely(sock == -1)) { usec_t start_ut = now_monotonic_usec(); - const char *s = destination; - while(*s) { - const char *e = s; - - // skip separators, moving both s(tart) and e(nd) - while(isspace(*e) || *e == ',') s = ++e; + size_t reconnects = 0; - // move e(nd) to the first separator - while(*e && !isspace(*e) && *e != ',') e++; + sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0); - // is there anything? - if(!*s || s == e) break; - - char buf[e - s + 1]; - strncpyz(buf, s, e - s); - chart_backend_reconnects++; - sock = connect_to(buf, default_port, &timeout); - if(sock != -1) break; - s = e; - } + chart_backend_reconnects += reconnects; chart_backend_latency += now_monotonic_usec() - start_ut; } @@ -488,7 +469,7 @@ void *backends_main(void *ptr) { // ------------------------------------------------------------------------ // update the monitoring charts - if(chart_ops->counter_done) rrdset_next(chart_ops); + if(likely(chart_ops->counter_done)) rrdset_next(chart_ops); rrddim_set(chart_ops, "read", chart_receptions); rrddim_set(chart_ops, "write", chart_transmission_successes); rrddim_set(chart_ops, "discard", chart_data_lost_events); @@ -496,13 +477,13 @@ void *backends_main(void *ptr) { rrddim_set(chart_ops, "reconnect", chart_backend_reconnects); rrdset_done(chart_ops); - if(chart_metrics->counter_done) rrdset_next(chart_metrics); + if(likely(chart_metrics->counter_done)) rrdset_next(chart_metrics); rrddim_set(chart_metrics, "buffered", chart_buffered_metrics); rrddim_set(chart_metrics, "lost", chart_lost_metrics); rrddim_set(chart_metrics, "sent", chart_sent_metrics); rrdset_done(chart_metrics); - if(chart_bytes->counter_done) rrdset_next(chart_bytes); + if(likely(chart_bytes->counter_done)) rrdset_next(chart_bytes); rrddim_set(chart_bytes, "buffered", chart_buffered_bytes); rrddim_set(chart_bytes, "lost", chart_lost_bytes); rrddim_set(chart_bytes, "sent", chart_sent_bytes); @@ -510,13 +491,13 @@ void *backends_main(void *ptr) { rrdset_done(chart_bytes); /* - if(chart_latency->counter_done) rrdset_next(chart_latency); + if(likely(chart_latency->counter_done)) rrdset_next(chart_latency); rrddim_set(chart_latency, "latency", chart_backend_latency); rrdset_done(chart_latency); */ getrusage(RUSAGE_THREAD, &thread); - if(chart_rusage->counter_done) rrdset_next(chart_rusage); + if(likely(chart_rusage->counter_done)) rrdset_next(chart_rusage); rrddim_set(chart_rusage, "user", thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec); rrddim_set(chart_rusage, "system", thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec); rrdset_done(chart_rusage);