3 // ----------------------------------------------------------------------------
4 // How backends work in netdata:
6 // 1. There is an independent thread that runs at the required interval
7 // (for example, once every 10 seconds)
9 // 2. Every time it wakes, it calls the backend formatting functions to build
10 // a buffer of data. This is a very fast, memory only operation.
12 // 3. If the buffer already includes data, the new data are appended.
13 // If the buffer becomes too big, because the data cannot be sent, a
14 // log is written and the buffer is discarded.
16 // 4. Then it tries to send all the data. It blocks until all the data are sent
17 // or the socket returns an error.
18 // If the time required for this is above the interval, it starts skipping
19 // intervals, but the calculated values include the entire database, without
20 // gaps (it remembers the timestamps and continues from where it stopped).
22 // 5. repeats the above forever.
25 #define BACKEND_SOURCE_DATA_AS_COLLECTED 0x00000001
26 #define BACKEND_SOURCE_DATA_AVERAGE 0x00000002
27 #define BACKEND_SOURCE_DATA_SUM 0x00000004
30 // ----------------------------------------------------------------------------
31 // helper functions for backends
33 // calculate the SUM or AVERAGE of a dimension, for any timeframe
34 // may return NAN if the database does not have any value in the give timeframe
36 static inline calculated_number backend_calculate_value_from_stored_data(
37 RRDSET *st // the chart
38 , RRDDIM *rd // the dimension
39 , time_t after // the start timestamp
40 , time_t before // the end timestamp
41 , uint32_t options // BACKEND_SOURCE_* bitmap
43 // find the edges of the rrd database for this chart
44 time_t first_t = rrdset_first_entry_t(st);
45 time_t last_t = rrdset_last_entry_t(st);
47 if(unlikely(before < first_t || after > last_t))
48 // the chart has not been updated in the wanted timeframe
51 // align the time-frame
52 // for 'after' also skip the first value by adding st->update_every
53 after = after - after % st->update_every + st->update_every;
54 before = before - before % st->update_every;
56 if(unlikely(after < first_t))
59 if(unlikely(after > before))
60 // this can happen when st->update_every > before - after
63 if(unlikely(before > last_t))
67 calculated_number sum = 0;
69 long start_at_slot = rrdset_time2slot(st, before),
70 stop_at_slot = rrdset_time2slot(st, after),
73 for(slot = start_at_slot; !stop_now ; slot--) {
75 if(unlikely(slot < 0)) slot = st->entries - 1;
76 if(unlikely(slot == stop_at_slot)) stop_now = 1;
78 storage_number n = rd->values[slot];
80 if(unlikely(!does_storage_number_exist(n))) {
85 calculated_number value = unpack_storage_number(n);
91 if(unlikely(!counter))
94 if(unlikely(options & BACKEND_SOURCE_DATA_SUM))
97 return sum / (calculated_number)counter;
101 // discard a response received by a backend
102 // after logging a simple of it to error.log
104 static inline int discard_response(BUFFER *b, const char *backend) {
106 const char *s = buffer_tostring(b);
107 char *d = sample, *e = &sample[sizeof(sample) - 1];
109 for(; *s && d < e ;s++) {
111 if(unlikely(!isprint(c))) c = ' ';
116 info("Received %zu bytes from %s backend. Ignoring them. Sample: '%s'", buffer_strlen(b), backend, sample);
122 // ----------------------------------------------------------------------------
125 static inline int format_dimension_collected_graphite_plaintext(
126 BUFFER *b // the buffer to write data to
127 , const char *prefix // the prefix to use
128 , RRDHOST *host // the host this chart comes from
129 , const char *hostname // the hostname (to override host->hostname)
130 , RRDSET *st // the chart
131 , RRDDIM *rd // the dimension
132 , time_t after // the start timestamp
133 , time_t before // the end timestamp
134 , uint32_t options // BACKEND_SOURCE_* bitmap
143 , "%s.%s.%s.%s " COLLECTED_NUMBER_FORMAT " %u\n"
148 , rd->last_collected_value
149 , (uint32_t)rd->last_collected_time.tv_sec
155 static inline int format_dimension_stored_graphite_plaintext(
156 BUFFER *b // the buffer to write data to
157 , const char *prefix // the prefix to use
158 , RRDHOST *host // the host this chart comes from
159 , const char *hostname // the hostname (to override host->hostname)
160 , RRDSET *st // the chart
161 , RRDDIM *rd // the dimension
162 , time_t after // the start timestamp
163 , time_t before // the end timestamp
164 , uint32_t options // BACKEND_SOURCE_* bitmap
168 calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options);
174 , "%s.%s.%s.%s " CALCULATED_NUMBER_FORMAT " %u\n"
188 static inline int process_graphite_response(BUFFER *b) {
189 return discard_response(b, "graphite");
193 // ----------------------------------------------------------------------------
196 static inline int format_dimension_collected_opentsdb_telnet(
197 BUFFER *b // the buffer to write data to
198 , const char *prefix // the prefix to use
199 , RRDHOST *host // the host this chart comes from
200 , const char *hostname // the hostname (to override host->hostname)
201 , RRDSET *st // the chart
202 , RRDDIM *rd // the dimension
203 , time_t after // the start timestamp
204 , time_t before // the end timestamp
205 , uint32_t options // BACKEND_SOURCE_* bitmap
214 , "put %s.%s.%s %u " COLLECTED_NUMBER_FORMAT " host=%s\n"
218 , (uint32_t)rd->last_collected_time.tv_sec
219 , rd->last_collected_value
226 static inline int format_dimension_stored_opentsdb_telnet(
227 BUFFER *b // the buffer to write data to
228 , const char *prefix // the prefix to use
229 , RRDHOST *host // the host this chart comes from
230 , const char *hostname // the hostname (to override host->hostname)
231 , RRDSET *st // the chart
232 , RRDDIM *rd // the dimension
233 , time_t after // the start timestamp
234 , time_t before // the end timestamp
235 , uint32_t options // BACKEND_SOURCE_* bitmap
239 calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options);
245 , "put %s.%s.%s %u " CALCULATED_NUMBER_FORMAT " host=%s\n"
259 static inline int process_opentsdb_response(BUFFER *b) {
260 return discard_response(b, "opentsdb");
264 // ----------------------------------------------------------------------------
267 static inline int format_dimension_collected_json_plaintext(
268 BUFFER *b // the buffer to write data to
269 , const char *prefix // the prefix to use
270 , RRDHOST *host // the host this chart comes from
271 , const char *hostname // the hostname (to override host->hostname)
272 , RRDSET *st // the chart
273 , RRDDIM *rd // the dimension
274 , time_t after // the start timestamp
275 , time_t before // the end timestamp
276 , uint32_t options // BACKEND_SOURCE_* bitmap
283 buffer_sprintf(b, "{"
285 "\"hostname\":\"%s\","
287 "\"chart_id\":\"%s\","
288 "\"chart_name\":\"%s\","
289 "\"chart_family\":\"%s\","
290 "\"chart_context\": \"%s\","
291 "\"chart_type\":\"%s\","
296 "\"value\":" COLLECTED_NUMBER_FORMAT ","
298 "\"timestamp\": %u}\n",
311 rd->last_collected_value,
313 (uint32_t)rd->last_collected_time.tv_sec
319 static inline int format_dimension_stored_json_plaintext(
320 BUFFER *b // the buffer to write data to
321 , const char *prefix // the prefix to use
322 , RRDHOST *host // the host this chart comes from
323 , const char *hostname // the hostname (to override host->hostname)
324 , RRDSET *st // the chart
325 , RRDDIM *rd // the dimension
326 , time_t after // the start timestamp
327 , time_t before // the end timestamp
328 , uint32_t options // BACKEND_SOURCE_* bitmap
332 calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options);
335 buffer_sprintf(b, "{"
337 "\"hostname\":\"%s\","
339 "\"chart_id\":\"%s\","
340 "\"chart_name\":\"%s\","
341 "\"chart_family\":\"%s\","
342 "\"chart_context\": \"%s\","
343 "\"chart_type\":\"%s\","
348 "\"value\":" CALCULATED_NUMBER_FORMAT ","
350 "\"timestamp\": %u}\n",
373 static inline int process_json_response(BUFFER *b) {
374 return discard_response(b, "json");
378 // ----------------------------------------------------------------------------
379 // the backend thread
381 void *backends_main(void *ptr) {
382 int default_port = 0;
384 struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
386 BUFFER *b = buffer_create(1), *response = buffer_create(1);
387 int (*backend_request_formatter)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, uint32_t) = NULL;
388 int (*backend_response_checker)(BUFFER *) = NULL;
390 info("BACKEND thread created with task id %d", gettid());
392 if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
393 error("Cannot set pthread cancel type to DEFERRED.");
395 if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
396 error("Cannot set pthread cancel state to ENABLE.");
398 // ------------------------------------------------------------------------
399 // collect configuration options
401 struct timeval timeout = {
406 int enabled = config_get_boolean(CONFIG_SECTION_BACKEND, "enabled", 0);
407 const char *source = config_get(CONFIG_SECTION_BACKEND, "data source", "average");
408 const char *type = config_get(CONFIG_SECTION_BACKEND, "type", "graphite");
409 const char *destination = config_get(CONFIG_SECTION_BACKEND, "destination", "localhost");
410 const char *prefix = config_get(CONFIG_SECTION_BACKEND, "prefix", "netdata");
411 const char *hostname = config_get(CONFIG_SECTION_BACKEND, "hostname", localhost->hostname);
412 int frequency = (int)config_get_number(CONFIG_SECTION_BACKEND, "update every", 10);
413 int buffer_on_failures = (int)config_get_number(CONFIG_SECTION_BACKEND, "buffer on failures", 10);
414 long timeoutms = config_get_number(CONFIG_SECTION_BACKEND, "timeout ms", frequency * 2 * 1000);
416 // ------------------------------------------------------------------------
417 // validate configuration options
418 // and prepare for sending data to our backend
420 if(!enabled || frequency < 1)
423 if(!strcmp(source, "as collected")) {
424 options = BACKEND_SOURCE_DATA_AS_COLLECTED;
426 else if(!strcmp(source, "average")) {
427 options = BACKEND_SOURCE_DATA_AVERAGE;
429 else if(!strcmp(source, "sum") || !strcmp(source, "volume")) {
430 options = BACKEND_SOURCE_DATA_SUM;
433 error("Invalid data source method '%s' for backend given. Disabling backed.", source);
438 error("BACKED invalid timeout %ld ms given. Assuming %d ms.", timeoutms, frequency * 2 * 1000);
439 timeoutms = frequency * 2 * 1000;
441 timeout.tv_sec = (timeoutms * 1000) / 1000000;
442 timeout.tv_usec = (timeoutms * 1000) % 1000000;
445 // ------------------------------------------------------------------------
446 // select the backend type
448 if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
451 backend_response_checker = process_graphite_response;
453 if(options == BACKEND_SOURCE_DATA_AS_COLLECTED)
454 backend_request_formatter = format_dimension_collected_graphite_plaintext;
456 backend_request_formatter = format_dimension_stored_graphite_plaintext;
459 else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
462 backend_response_checker = process_opentsdb_response;
464 if(options == BACKEND_SOURCE_DATA_AS_COLLECTED)
465 backend_request_formatter = format_dimension_collected_opentsdb_telnet;
467 backend_request_formatter = format_dimension_stored_opentsdb_telnet;
470 else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) {
473 backend_response_checker = process_json_response;
475 if (options == BACKEND_SOURCE_DATA_AS_COLLECTED)
476 backend_request_formatter = format_dimension_collected_json_plaintext;
478 backend_request_formatter = format_dimension_stored_json_plaintext;
482 error("Unknown backend type '%s'", type);
486 if(backend_request_formatter == NULL || backend_response_checker == NULL) {
487 error("backend is misconfigured - disabling it.");
492 // ------------------------------------------------------------------------
493 // prepare the charts for monitoring the backend operation
495 struct rusage thread;
498 chart_buffered_metrics = 0,
499 chart_lost_metrics = 0,
500 chart_sent_metrics = 0,
501 chart_buffered_bytes = 0,
502 chart_received_bytes = 0,
503 chart_sent_bytes = 0,
504 chart_receptions = 0,
505 chart_transmission_successes = 0,
506 chart_transmission_failures = 0,
507 chart_data_lost_events = 0,
508 chart_lost_bytes = 0,
509 chart_backend_reconnects = 0,
510 chart_backend_latency = 0;
512 RRDSET *chart_metrics = rrdset_create_localhost("netdata", "backend_metrics", NULL, "backend", NULL, "Netdata Buffered Metrics", "metrics", 130600, frequency, RRDSET_TYPE_LINE);
513 rrddim_add(chart_metrics, "buffered", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
514 rrddim_add(chart_metrics, "lost", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
515 rrddim_add(chart_metrics, "sent", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
517 RRDSET *chart_bytes = rrdset_create_localhost("netdata", "backend_bytes", NULL, "backend", NULL, "Netdata Backend Data Size", "KB", 130610, frequency, RRDSET_TYPE_AREA);
518 rrddim_add(chart_bytes, "buffered", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
519 rrddim_add(chart_bytes, "lost", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
520 rrddim_add(chart_bytes, "sent", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
521 rrddim_add(chart_bytes, "received", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
523 RRDSET *chart_ops = rrdset_create_localhost("netdata", "backend_ops", NULL, "backend", NULL, "Netdata Backend Operations", "operations", 130630, frequency, RRDSET_TYPE_LINE);
524 rrddim_add(chart_ops, "write", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
525 rrddim_add(chart_ops, "discard", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
526 rrddim_add(chart_ops, "reconnect", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
527 rrddim_add(chart_ops, "failure", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
528 rrddim_add(chart_ops, "read", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
531 * this is misleading - we can only measure the time we need to send data
532 * this time is not related to the time required for the data to travel to
533 * the backend database and the time that server needed to process them
535 * issue #1432 and https://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html
537 RRDSET *chart_latency = rrdset_create_localhost("netdata", "backend_latency", NULL, "backend", NULL, "Netdata Backend Latency", "ms", 130620, frequency, RRDSET_TYPE_AREA);
538 rrddim_add(chart_latency, "latency", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
541 RRDSET *chart_rusage = rrdset_create_localhost("netdata", "backend_thread_cpu", NULL, "backend", NULL, "NetData Backend Thread CPU usage", "milliseconds/s", 130630, frequency, RRDSET_TYPE_STACKED);
542 rrddim_add(chart_rusage, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
543 rrddim_add(chart_rusage, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
546 // ------------------------------------------------------------------------
547 // prepare the backend main loop
549 info("BACKEND configured ('%s' on '%s' sending '%s' data, every %d seconds, as host '%s', with prefix '%s')", type, destination, source, frequency, hostname, prefix);
551 usec_t step_ut = frequency * USEC_PER_SEC;
552 time_t after = now_realtime_sec();
559 // ------------------------------------------------------------------------
560 // Wait for the next iteration point.
561 heartbeat_next(&hb, step_ut);
562 time_t before = now_realtime_sec();
565 // ------------------------------------------------------------------------
566 // add to the buffer the data we need to send to the backend
568 int pthreadoldcancelstate;
570 if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &pthreadoldcancelstate) != 0))
571 error("Cannot set pthread cancel state to DISABLE.");
575 rrdhost_foreach_read(host) {
576 if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE)
579 rrdhost_rdlock(host);
582 rrdset_foreach_read(st, host) {
586 rrddim_foreach_read(rd, st) {
587 if(rd->last_collected_time.tv_sec >= after)
588 chart_buffered_metrics += backend_request_formatter(b, prefix, host, (host == localhost)?hostname:host->hostname, st, rd, after, before, options);
592 rrdhost_unlock(host);
596 if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
597 error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
599 // ------------------------------------------------------------------------
601 chart_buffered_bytes = (collected_number)buffer_strlen(b);
603 // reset the monitoring chart counters
604 chart_received_bytes =
608 chart_transmission_successes =
609 chart_transmission_failures =
610 chart_data_lost_events =
612 chart_backend_reconnects =
613 chart_backend_latency = 0;
615 if(unlikely(netdata_exit)) break;
617 //fprintf(stderr, "\nBACKEND BEGIN:\n%s\nBACKEND END\n", buffer_tostring(b)); // FIXME
618 //fprintf(stderr, "after = %lu, before = %lu\n", after, before);
620 // prepare for the next iteration
621 // to add incrementally data to buffer
624 // ------------------------------------------------------------------------
625 // if we are connected, receive a response, without blocking
627 if(likely(sock != -1)) {
630 // loop through to collect all data
631 while(sock != -1 && errno != EWOULDBLOCK) {
632 buffer_need_bytes(response, 4096);
634 ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
636 // we received some data
638 chart_received_bytes += r;
642 error("Backend '%s' closed the socket", destination);
647 // failed to receive data
648 if(errno != EAGAIN && errno != EWOULDBLOCK) {
649 error("Cannot receive data from backend '%s'.", destination);
654 // if we received data, process them
655 if(buffer_strlen(response))
656 backend_response_checker(response);
659 // ------------------------------------------------------------------------
660 // if we are not connected, connect to a backend server
662 if(unlikely(sock == -1)) {
663 usec_t start_ut = now_monotonic_usec();
664 size_t reconnects = 0;
666 sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
668 chart_backend_reconnects += reconnects;
669 chart_backend_latency += now_monotonic_usec() - start_ut;
672 if(unlikely(netdata_exit)) break;
674 // ------------------------------------------------------------------------
675 // if we are connected, send our buffer to the backend server
677 if(likely(sock != -1)) {
678 size_t len = buffer_strlen(b);
679 usec_t start_ut = now_monotonic_usec();
682 flags += MSG_NOSIGNAL;
685 ssize_t written = send(sock, buffer_tostring(b), len, flags);
686 chart_backend_latency += now_monotonic_usec() - start_ut;
687 if(written != -1 && (size_t)written == len) {
688 // we sent the data successfully
689 chart_transmission_successes++;
690 chart_sent_bytes += written;
691 chart_sent_metrics = chart_buffered_metrics;
693 // reset the failures count
700 // oops! we couldn't send (all or some of the) data
701 error("Failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", destination, len, written);
702 chart_transmission_failures++;
705 chart_sent_bytes += written;
707 // increment the counter we check for data loss
710 // close the socket - we will re-open it next time
716 error("Failed to update database backend '%s'", destination);
717 chart_transmission_failures++;
719 // increment the counter we check for data loss
723 if(failures > buffer_on_failures) {
724 // too bad! we are going to lose data
725 chart_lost_bytes += buffer_strlen(b);
726 error("Reached %d backend failures. Flushing buffers to protect this host - this results in data loss on back-end server '%s'", failures, destination);
729 chart_data_lost_events++;
730 chart_lost_metrics = chart_buffered_metrics;
733 if(unlikely(netdata_exit)) break;
735 // ------------------------------------------------------------------------
736 // update the monitoring charts
738 if(likely(chart_ops->counter_done)) rrdset_next(chart_ops);
739 rrddim_set(chart_ops, "read", chart_receptions);
740 rrddim_set(chart_ops, "write", chart_transmission_successes);
741 rrddim_set(chart_ops, "discard", chart_data_lost_events);
742 rrddim_set(chart_ops, "failure", chart_transmission_failures);
743 rrddim_set(chart_ops, "reconnect", chart_backend_reconnects);
744 rrdset_done(chart_ops);
746 if(likely(chart_metrics->counter_done)) rrdset_next(chart_metrics);
747 rrddim_set(chart_metrics, "buffered", chart_buffered_metrics);
748 rrddim_set(chart_metrics, "lost", chart_lost_metrics);
749 rrddim_set(chart_metrics, "sent", chart_sent_metrics);
750 rrdset_done(chart_metrics);
752 if(likely(chart_bytes->counter_done)) rrdset_next(chart_bytes);
753 rrddim_set(chart_bytes, "buffered", chart_buffered_bytes);
754 rrddim_set(chart_bytes, "lost", chart_lost_bytes);
755 rrddim_set(chart_bytes, "sent", chart_sent_bytes);
756 rrddim_set(chart_bytes, "received", chart_received_bytes);
757 rrdset_done(chart_bytes);
760 if(likely(chart_latency->counter_done)) rrdset_next(chart_latency);
761 rrddim_set(chart_latency, "latency", chart_backend_latency);
762 rrdset_done(chart_latency);
765 getrusage(RUSAGE_THREAD, &thread);
766 if(likely(chart_rusage->counter_done)) rrdset_next(chart_rusage);
767 rrddim_set(chart_rusage, "user", thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec);
768 rrddim_set(chart_rusage, "system", thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec);
769 rrdset_done(chart_rusage);
771 if(likely(buffer_strlen(b) == 0))
772 chart_buffered_metrics = 0;
774 if(unlikely(netdata_exit)) break;
782 buffer_free(response);
784 info("BACKEND thread exiting");
786 static_thread->enabled = 0;