X-Git-Url: https://arthur.barton.de/gitweb/?a=blobdiff_plain;f=src%2Fbackends.c;h=a2c25e94bdb70cbc758d485a3de682451a744a02;hb=9dccc16d3763392f0b13349de18c3a838a667653;hp=94e8f5f671802c387d33192f86838ea40df7fe72;hpb=719ba47914bad933bf4503d3340b9543d5e631a6;p=netdata.git diff --git a/src/backends.c b/src/backends.c index 94e8f5f6..a2c25e94 100644 --- a/src/backends.c +++ b/src/backends.c @@ -1,12 +1,48 @@ #include "common.h" +// ---------------------------------------------------------------------------- +// How backends work in netdata: +// +// 1. There is an independent thread that runs at the required interval +// (for example, once every 10 seconds) +// +// 2. Every time it wakes, it calls the backend formatting functions to build +// a buffer of data. This is a very fast, memory only operation. +// +// 3. If the buffer already includes data, the new data are appended. +// If the buffer becomes too big, because the data cannot be sent, a +// log is written and the buffer is discarded. +// +// 4. Then it tries to send all the data. It blocks until all the data are sent +// or the socket returns an error. +// If the time required for this is above the interval, it starts skipping +// intervals, but the calculated values include the entire database, without +// gaps (it remembers the timestamps and continues from where it stopped). +// +// 5. repeats the above forever. +// + #define BACKEND_SOURCE_DATA_AS_COLLECTED 0x00000001 #define BACKEND_SOURCE_DATA_AVERAGE 0x00000002 #define BACKEND_SOURCE_DATA_SUM 0x00000004 -static inline calculated_number backend_calculate_value_from_stored_data(RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) { + +// ---------------------------------------------------------------------------- +// helper functions for backends + +// calculate the SUM or AVERAGE of a dimension, for any timeframe +// may return NAN if the database does not have any value in the give timeframe + +static inline calculated_number backend_calculate_value_from_stored_data( + RRDSET *st // the chart + , RRDDIM *rd // the dimension + , time_t after // the start timestamp + , time_t before // the end timestamp + , uint32_t options // BACKEND_SOURCE_* bitmap +) { + // find the edges of the rrd database for this chart time_t first_t = rrdset_first_entry_t(st); - time_t last_t = rrdset_last_entry_t(st); + time_t last_t = rrdset_last_entry_t(st); if(unlikely(before < first_t || after > last_t)) // the chart has not been updated in the wanted timeframe @@ -21,7 +57,7 @@ static inline calculated_number backend_calculate_value_from_stored_data(RRDSET after = first_t; if(unlikely(after > before)) - // this can happen when the st->update_every > before - after + // this can happen when st->update_every > before - after before = after; if(unlikely(before > last_t)) @@ -35,14 +71,20 @@ static inline calculated_number backend_calculate_value_from_stored_data(RRDSET slot, stop_now = 0; for(slot = start_at_slot; !stop_now ; slot--) { + if(unlikely(slot < 0)) slot = st->entries - 1; if(unlikely(slot == stop_at_slot)) stop_now = 1; storage_number n = rd->values[slot]; - if(unlikely(!does_storage_number_exist(n))) continue; + + if(unlikely(!does_storage_number_exist(n))) { + // not collected + continue; + } calculated_number value = unpack_storage_number(n); sum += value; + counter++; } @@ -55,78 +97,287 @@ static inline calculated_number backend_calculate_value_from_stored_data(RRDSET return sum / (calculated_number)counter; } -static inline int format_dimension_collected_graphite_plaintext(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) { + +// discard a response received by a backend +// after logging a simple of it to error.log + +static inline int discard_response(BUFFER *b, const char *backend) { + char sample[1024]; + const char *s = buffer_tostring(b); + char *d = sample, *e = &sample[sizeof(sample) - 1]; + + for(; *s && d < e ;s++) { + char c = *s; + if(unlikely(!isprint(c))) c = ' '; + *d++ = c; + } + *d = '\0'; + + info("Received %zu bytes from %s backend. Ignoring them. Sample: '%s'", buffer_strlen(b), backend, sample); + buffer_flush(b); + return 0; +} + + +// ---------------------------------------------------------------------------- +// graphite backend + +static inline int format_dimension_collected_graphite_plaintext( + BUFFER *b // the buffer to write data to + , const char *prefix // the prefix to use + , RRDHOST *host // the host this chart comes from + , const char *hostname // the hostname (to override host->hostname) + , RRDSET *st // the chart + , RRDDIM *rd // the dimension + , time_t after // the start timestamp + , time_t before // the end timestamp + , uint32_t options // BACKEND_SOURCE_* bitmap +) { (void)host; (void)after; (void)before; (void)options; - buffer_sprintf(b, "%s.%s.%s.%s " COLLECTED_NUMBER_FORMAT " %u\n", prefix, hostname, st->id, rd->id, rd->last_collected_value, (uint32_t)rd->last_collected_time.tv_sec); + + buffer_sprintf( + b + , "%s.%s.%s.%s " COLLECTED_NUMBER_FORMAT " %u\n" + , prefix + , hostname + , st->id + , rd->id + , rd->last_collected_value + , (uint32_t)rd->last_collected_time.tv_sec + ); + return 1; } -static inline int format_dimension_stored_graphite_plaintext(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) { +static inline int format_dimension_stored_graphite_plaintext( + BUFFER *b // the buffer to write data to + , const char *prefix // the prefix to use + , RRDHOST *host // the host this chart comes from + , const char *hostname // the hostname (to override host->hostname) + , RRDSET *st // the chart + , RRDDIM *rd // the dimension + , time_t after // the start timestamp + , time_t before // the end timestamp + , uint32_t options // BACKEND_SOURCE_* bitmap +) { (void)host; + calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options); + if(!isnan(value)) { - buffer_sprintf(b, "%s.%s.%s.%s " CALCULATED_NUMBER_FORMAT " %u\n", prefix, hostname, st->id, rd->id, value, (uint32_t) before); + + buffer_sprintf( + b + , "%s.%s.%s.%s " CALCULATED_NUMBER_FORMAT " %u\n" + , prefix + , hostname + , st->id + , rd->id + , value + , (uint32_t) before + ); + return 1; } return 0; } -static inline int format_dimension_collected_opentsdb_telnet(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) { +static inline int process_graphite_response(BUFFER *b) { + return discard_response(b, "graphite"); +} + + +// ---------------------------------------------------------------------------- +// opentsdb backend + +static inline int format_dimension_collected_opentsdb_telnet( + BUFFER *b // the buffer to write data to + , const char *prefix // the prefix to use + , RRDHOST *host // the host this chart comes from + , const char *hostname // the hostname (to override host->hostname) + , RRDSET *st // the chart + , RRDDIM *rd // the dimension + , time_t after // the start timestamp + , time_t before // the end timestamp + , uint32_t options // BACKEND_SOURCE_* bitmap +) { (void)host; (void)after; (void)before; (void)options; - buffer_sprintf(b, "put %s.%s.%s %u " COLLECTED_NUMBER_FORMAT " host=%s\n", prefix, st->id, rd->id, (uint32_t)rd->last_collected_time.tv_sec, rd->last_collected_value, hostname); + + buffer_sprintf( + b + , "put %s.%s.%s %u " COLLECTED_NUMBER_FORMAT " host=%s\n" + , prefix + , st->id + , rd->id + , (uint32_t)rd->last_collected_time.tv_sec + , rd->last_collected_value + , hostname + ); + return 1; } -static inline int format_dimension_stored_opentsdb_telnet(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) { +static inline int format_dimension_stored_opentsdb_telnet( + BUFFER *b // the buffer to write data to + , const char *prefix // the prefix to use + , RRDHOST *host // the host this chart comes from + , const char *hostname // the hostname (to override host->hostname) + , RRDSET *st // the chart + , RRDDIM *rd // the dimension + , time_t after // the start timestamp + , time_t before // the end timestamp + , uint32_t options // BACKEND_SOURCE_* bitmap +) { (void)host; + calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options); + if(!isnan(value)) { - buffer_sprintf(b, "put %s.%s.%s %u " CALCULATED_NUMBER_FORMAT " host=%s\n", prefix, st->id, rd->id, (uint32_t) before, value, hostname); + + buffer_sprintf( + b + , "put %s.%s.%s %u " CALCULATED_NUMBER_FORMAT " host=%s\n" + , prefix + , st->id + , rd->id + , (uint32_t) before + , value + , hostname + ); + return 1; } return 0; } -static inline int process_graphite_response(BUFFER *b) { - char sample[1024]; - const char *s = buffer_tostring(b); - char *d = sample, *e = &sample[sizeof(sample) - 1]; +static inline int process_opentsdb_response(BUFFER *b) { + return discard_response(b, "opentsdb"); +} - for(; *s && d < e ;s++) { - char c = *s; - if(unlikely(!isprint(c))) c = ' '; - *d++ = c; - } - *d = '\0'; - info("Received %zu bytes from graphite backend. Ignoring them. Sample: '%s'", buffer_strlen(b), sample); - buffer_flush(b); - return 0; +// ---------------------------------------------------------------------------- +// json backend + +static inline int format_dimension_collected_json_plaintext( + BUFFER *b // the buffer to write data to + , const char *prefix // the prefix to use + , RRDHOST *host // the host this chart comes from + , const char *hostname // the hostname (to override host->hostname) + , RRDSET *st // the chart + , RRDDIM *rd // the dimension + , time_t after // the start timestamp + , time_t before // the end timestamp + , uint32_t options // BACKEND_SOURCE_* bitmap +) { + (void)host; + (void)after; + (void)before; + (void)options; + + buffer_sprintf(b, "{" + "\"prefix\":\"%s\"," + "\"hostname\":\"%s\"," + + "\"chart_id\":\"%s\"," + "\"chart_name\":\"%s\"," + "\"chart_family\":\"%s\"," + "\"chart_context\": \"%s\"," + "\"chart_type\":\"%s\"," + "\"units\": \"%s\"," + + "\"id\":\"%s\"," + "\"name\":\"%s\"," + "\"value\":" COLLECTED_NUMBER_FORMAT "," + + "\"timestamp\": %u}\n", + prefix, + hostname, + + st->id, + st->name, + st->family, + st->context, + st->type, + st->units, + + rd->id, + rd->name, + rd->last_collected_value, + + (uint32_t)rd->last_collected_time.tv_sec + ); + + return 1; } -static inline int process_opentsdb_response(BUFFER *b) { - char sample[1024]; - const char *s = buffer_tostring(b); - char *d = sample, *e = &sample[sizeof(sample) - 1]; +static inline int format_dimension_stored_json_plaintext( + BUFFER *b // the buffer to write data to + , const char *prefix // the prefix to use + , RRDHOST *host // the host this chart comes from + , const char *hostname // the hostname (to override host->hostname) + , RRDSET *st // the chart + , RRDDIM *rd // the dimension + , time_t after // the start timestamp + , time_t before // the end timestamp + , uint32_t options // BACKEND_SOURCE_* bitmap +) { + (void)host; - for(; *s && d < e ;s++) { - char c = *s; - if(unlikely(!isprint(c))) c = ' '; - *d++ = c; - } - *d = '\0'; + calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options); - info("Received %zu bytes from opentsdb backend. Ignoring them. Sample: '%s'", buffer_strlen(b), sample); - buffer_flush(b); + if(!isnan(value)) { + buffer_sprintf(b, "{" + "\"prefix\":\"%s\"," + "\"hostname\":\"%s\"," + + "\"chart_id\":\"%s\"," + "\"chart_name\":\"%s\"," + "\"chart_family\":\"%s\"," + "\"chart_context\": \"%s\"," + "\"chart_type\":\"%s\"," + "\"units\": \"%s\"," + + "\"id\":\"%s\"," + "\"name\":\"%s\"," + "\"value\":" CALCULATED_NUMBER_FORMAT "," + + "\"timestamp\": %u}\n", + prefix, + hostname, + + st->id, + st->name, + st->family, + st->context, + st->type, + st->units, + + rd->id, + rd->name, + value, + + (uint32_t)before + ); + + return 1; + } return 0; } +static inline int process_json_response(BUFFER *b) { + return discard_response(b, "json"); +} + + +// ---------------------------------------------------------------------------- +// the backend thread + void *backends_main(void *ptr) { int default_port = 0; int sock = -1; @@ -195,22 +446,37 @@ void *backends_main(void *ptr) { // select the backend type if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) { + default_port = 2003; + backend_response_checker = process_graphite_response; + if(options == BACKEND_SOURCE_DATA_AS_COLLECTED) backend_request_formatter = format_dimension_collected_graphite_plaintext; else backend_request_formatter = format_dimension_stored_graphite_plaintext; - backend_response_checker = process_graphite_response; } else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) { + default_port = 4242; + backend_response_checker = process_opentsdb_response; + if(options == BACKEND_SOURCE_DATA_AS_COLLECTED) backend_request_formatter = format_dimension_collected_opentsdb_telnet; else backend_request_formatter = format_dimension_stored_opentsdb_telnet; - backend_response_checker = process_opentsdb_response; + } + else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) { + + default_port = 5448; + backend_response_checker = process_json_response; + + if (options == BACKEND_SOURCE_DATA_AS_COLLECTED) + backend_request_formatter = format_dimension_collected_json_plaintext; + else + backend_request_formatter = format_dimension_stored_json_plaintext; + } else { error("Unknown backend type '%s'", type); @@ -304,29 +570,57 @@ void *backends_main(void *ptr) { if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &pthreadoldcancelstate) != 0)) error("Cannot set pthread cancel state to DISABLE."); + size_t count_hosts = 0; + size_t count_charts_total = 0; + size_t count_dims_total = 0; + rrd_rdlock(); RRDHOST *host; rrdhost_foreach_read(host) { - if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE) + if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE) { + debug(D_BACKEND, "BACKEND: not sending host '%s' because its memory mode is '%s'", host->hostname, rrd_memory_mode_name(host->rrd_memory_mode)); continue; + } rrdhost_rdlock(host); + count_hosts++; + size_t count_charts = 0; + size_t count_dims = 0; + size_t count_dims_skipped = 0; + + const char *__hostname = (host == localhost)?hostname:host->hostname; + RRDSET *st; rrdset_foreach_read(st, host) { rrdset_rdlock(st); + count_charts++; + RRDDIM *rd; rrddim_foreach_read(rd, st) { - if(rd->last_collected_time.tv_sec >= after) - chart_buffered_metrics += backend_request_formatter(b, prefix, host, (host == localhost)?hostname:host->hostname, st, rd, after, before, options); + if(likely(rd->last_collected_time.tv_sec >= after)) { + chart_buffered_metrics += backend_request_formatter(b, prefix, host, __hostname, st, rd, after, before, options); + count_dims++; + } + else { + debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection is not within our timeframe", rd->id, st->id, __hostname); + count_dims_skipped++; + } } rrdset_unlock(st); } + + debug(D_BACKEND, "BACKEND: sending host '%s', metrics of %zu dimensions, of %zu charts. Skipped %zu dimensions.", __hostname, count_dims, count_charts, count_dims_skipped); + count_charts_total += count_charts; + count_dims_total += count_dims; + rrdhost_unlock(host); } rrd_unlock(); + debug(D_BACKEND, "BACKEND: buffer has %zu bytes, added metrics for %zu dimensions, of %zu charts, from %zu hosts", buffer_strlen(b), count_dims_total, count_charts_total, count_hosts); + if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0)) error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);