]> arthur.barton.de Git - netdata.git/blob - src/backends.c
3e385cab504bb46a91f1369d444986922baffa84
[netdata.git] / src / backends.c
1 #include "common.h"
2
3 // ----------------------------------------------------------------------------
4 // How backends work in netdata:
5 //
6 // 1. There is an independent thread that runs at the required interval
7 //    (for example, once every 10 seconds)
8 //
9 // 2. Every time it wakes, it calls the backend formatting functions to build
10 //    a buffer of data. This is a very fast, memory only operation.
11 //
12 // 3. If the buffer already includes data, the new data are appended.
13 //    If the buffer becomes too big, because the data cannot be sent, a
14 //    log is written and the buffer is discarded.
15 //
16 // 4. Then it tries to send all the data. It blocks until all the data are sent
17 //    or the socket returns an error.
18 //    If the time required for this is above the interval, it starts skipping
19 //    intervals, but the calculated values include the entire database, without
20 //    gaps (it remembers the timestamps and continues from where it stopped).
21 //
22 // 5. repeats the above forever.
23 //
24
25 #define BACKEND_SOURCE_DATA_AS_COLLECTED 0x00000001
26 #define BACKEND_SOURCE_DATA_AVERAGE      0x00000002
27 #define BACKEND_SOURCE_DATA_SUM          0x00000004
28
29
30 // ----------------------------------------------------------------------------
31 // helper functions for backends
32
33 // calculate the SUM or AVERAGE of a dimension, for any timeframe
34 // may return NAN if the database does not have any value in the give timeframe
35
36 static inline calculated_number backend_calculate_value_from_stored_data(
37           RRDSET *st                // the chart
38         , RRDDIM *rd                // the dimension
39         , time_t after              // the start timestamp
40         , time_t before             // the end timestamp
41         , uint32_t options          // BACKEND_SOURCE_* bitmap
42 ) {
43     // find the edges of the rrd database for this chart
44     time_t first_t = rrdset_first_entry_t(st);
45     time_t last_t  = rrdset_last_entry_t(st);
46
47     if(unlikely(before < first_t || after > last_t))
48         // the chart has not been updated in the wanted timeframe
49         return NAN;
50
51     // align the time-frame
52     // for 'after' also skip the first value by adding st->update_every
53     after  = after  - after  % st->update_every + st->update_every;
54     before = before - before % st->update_every;
55
56     if(unlikely(after < first_t))
57         after = first_t;
58
59     if(unlikely(after > before))
60         // this can happen when st->update_every > before - after
61         before = after;
62
63     if(unlikely(before > last_t))
64         before = last_t;
65
66     size_t counter = 0;
67     calculated_number sum = 0;
68
69     long    start_at_slot = rrdset_time2slot(st, before),
70             stop_at_slot  = rrdset_time2slot(st, after),
71             slot, stop_now = 0;
72
73     for(slot = start_at_slot; !stop_now ; slot--) {
74
75         if(unlikely(slot < 0)) slot = st->entries - 1;
76         if(unlikely(slot == stop_at_slot)) stop_now = 1;
77
78         storage_number n = rd->values[slot];
79
80         if(unlikely(!does_storage_number_exist(n))) {
81             // not collected
82             continue;
83         }
84
85         calculated_number value = unpack_storage_number(n);
86         sum += value;
87
88         counter++;
89     }
90
91     if(unlikely(!counter))
92         return NAN;
93
94     if(unlikely(options & BACKEND_SOURCE_DATA_SUM))
95         return sum;
96
97     return sum / (calculated_number)counter;
98 }
99
100
101 // discard a response received by a backend
102 // after logging a simple of it to error.log
103
104 static inline int discard_response(BUFFER *b, const char *backend) {
105     char sample[1024];
106     const char *s = buffer_tostring(b);
107     char *d = sample, *e = &sample[sizeof(sample) - 1];
108
109     for(; *s && d < e ;s++) {
110         char c = *s;
111         if(unlikely(!isprint(c))) c = ' ';
112         *d++ = c;
113     }
114     *d = '\0';
115
116     info("Received %zu bytes from %s backend. Ignoring them. Sample: '%s'", buffer_strlen(b), backend, sample);
117     buffer_flush(b);
118     return 0;
119 }
120
121
122 // ----------------------------------------------------------------------------
123 // graphite backend
124
125 static inline int format_dimension_collected_graphite_plaintext(
126           BUFFER *b                 // the buffer to write data to
127         , const char *prefix        // the prefix to use
128         , RRDHOST *host             // the host this chart comes from
129         , const char *hostname      // the hostname (to override host->hostname)
130         , RRDSET *st                // the chart
131         , RRDDIM *rd                // the dimension
132         , time_t after              // the start timestamp
133         , time_t before             // the end timestamp
134         , uint32_t options          // BACKEND_SOURCE_* bitmap
135 ) {
136     (void)host;
137     (void)after;
138     (void)before;
139     (void)options;
140
141     buffer_sprintf(
142             b
143             , "%s.%s.%s.%s " COLLECTED_NUMBER_FORMAT " %u\n"
144             , prefix
145             , hostname
146             , st->id
147             , rd->id
148             , rd->last_collected_value
149             , (uint32_t)rd->last_collected_time.tv_sec
150     );
151
152     return 1;
153 }
154
155 static inline int format_dimension_stored_graphite_plaintext(
156           BUFFER *b                 // the buffer to write data to
157         , const char *prefix        // the prefix to use
158         , RRDHOST *host             // the host this chart comes from
159         , const char *hostname      // the hostname (to override host->hostname)
160         , RRDSET *st                // the chart
161         , RRDDIM *rd                // the dimension
162         , time_t after              // the start timestamp
163         , time_t before             // the end timestamp
164         , uint32_t options          // BACKEND_SOURCE_* bitmap
165 ) {
166     (void)host;
167
168     calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options);
169
170     if(!isnan(value)) {
171
172         buffer_sprintf(
173                 b
174                 , "%s.%s.%s.%s " CALCULATED_NUMBER_FORMAT " %u\n"
175                 , prefix
176                 , hostname
177                 , st->id
178                 , rd->id
179                 , value
180                 , (uint32_t) before
181         );
182
183         return 1;
184     }
185     return 0;
186 }
187
188 static inline int process_graphite_response(BUFFER *b) {
189     return discard_response(b, "graphite");
190 }
191
192
193 // ----------------------------------------------------------------------------
194 // opentsdb backend
195
196 static inline int format_dimension_collected_opentsdb_telnet(
197           BUFFER *b                 // the buffer to write data to
198         , const char *prefix        // the prefix to use
199         , RRDHOST *host             // the host this chart comes from
200         , const char *hostname      // the hostname (to override host->hostname)
201         , RRDSET *st                // the chart
202         , RRDDIM *rd                // the dimension
203         , time_t after              // the start timestamp
204         , time_t before             // the end timestamp
205         , uint32_t options          // BACKEND_SOURCE_* bitmap
206 ) {
207     (void)host;
208     (void)after;
209     (void)before;
210     (void)options;
211
212     buffer_sprintf(
213             b
214             , "put %s.%s.%s %u " COLLECTED_NUMBER_FORMAT " host=%s\n"
215             , prefix
216             , st->id
217             , rd->id
218             , (uint32_t)rd->last_collected_time.tv_sec
219             , rd->last_collected_value
220             , hostname
221     );
222
223     return 1;
224 }
225
226 static inline int format_dimension_stored_opentsdb_telnet(
227           BUFFER *b                 // the buffer to write data to
228         , const char *prefix        // the prefix to use
229         , RRDHOST *host             // the host this chart comes from
230         , const char *hostname      // the hostname (to override host->hostname)
231         , RRDSET *st                // the chart
232         , RRDDIM *rd                // the dimension
233         , time_t after              // the start timestamp
234         , time_t before             // the end timestamp
235         , uint32_t options          // BACKEND_SOURCE_* bitmap
236 ) {
237     (void)host;
238
239     calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options);
240
241     if(!isnan(value)) {
242
243         buffer_sprintf(
244                 b
245                 , "put %s.%s.%s %u " CALCULATED_NUMBER_FORMAT " host=%s\n"
246                 , prefix
247                 , st->id
248                 , rd->id
249                 , (uint32_t) before
250                 , value
251                 , hostname
252         );
253
254         return 1;
255     }
256     return 0;
257 }
258
259 static inline int process_opentsdb_response(BUFFER *b) {
260     return discard_response(b, "opentsdb");
261 }
262
263
264 // ----------------------------------------------------------------------------
265 // json backend
266
267 static inline int format_dimension_collected_json_plaintext(
268           BUFFER *b                 // the buffer to write data to
269         , const char *prefix        // the prefix to use
270         , RRDHOST *host             // the host this chart comes from
271         , const char *hostname      // the hostname (to override host->hostname)
272         , RRDSET *st                // the chart
273         , RRDDIM *rd                // the dimension
274         , time_t after              // the start timestamp
275         , time_t before             // the end timestamp
276         , uint32_t options          // BACKEND_SOURCE_* bitmap
277 ) {
278     (void)host;
279     (void)after;
280     (void)before;
281     (void)options;
282
283     buffer_sprintf(b, "{"
284         "\"prefix\":\"%s\","
285         "\"hostname\":\"%s\","
286
287         "\"chart_id\":\"%s\","
288         "\"chart_name\":\"%s\","
289         "\"chart_family\":\"%s\","
290         "\"chart_context\": \"%s\","
291         "\"chart_type\":\"%s\","
292         "\"units\": \"%s\","
293
294         "\"id\":\"%s\","
295         "\"name\":\"%s\","
296         "\"value\":" COLLECTED_NUMBER_FORMAT ","
297
298         "\"timestamp\": %u}\n", 
299             prefix,
300             hostname,
301
302             st->id,
303             st->name,
304             st->family,
305             st->context,
306             st->type,
307             st->units,
308
309             rd->id,
310             rd->name,
311             rd->last_collected_value,
312
313             (uint32_t)rd->last_collected_time.tv_sec
314     );
315
316     return 1;
317 }
318
319 static inline int format_dimension_stored_json_plaintext(
320           BUFFER *b                 // the buffer to write data to
321         , const char *prefix        // the prefix to use
322         , RRDHOST *host             // the host this chart comes from
323         , const char *hostname      // the hostname (to override host->hostname)
324         , RRDSET *st                // the chart
325         , RRDDIM *rd                // the dimension
326         , time_t after              // the start timestamp
327         , time_t before             // the end timestamp
328         , uint32_t options          // BACKEND_SOURCE_* bitmap
329 ) {
330     (void)host;
331
332     calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options);
333
334     if(!isnan(value)) {
335         buffer_sprintf(b, "{"
336             "\"prefix\":\"%s\","
337             "\"hostname\":\"%s\","
338
339             "\"chart_id\":\"%s\","
340             "\"chart_name\":\"%s\","
341             "\"chart_family\":\"%s\","
342             "\"chart_context\": \"%s\","
343             "\"chart_type\":\"%s\","
344             "\"units\": \"%s\","
345
346             "\"id\":\"%s\","
347             "\"name\":\"%s\","
348             "\"value\":" CALCULATED_NUMBER_FORMAT ","
349
350             "\"timestamp\": %u}\n", 
351                 prefix,
352                 hostname,
353                 
354                 st->id,
355                 st->name,
356                 st->family,
357                 st->context,
358                 st->type,
359                 st->units,
360
361                 rd->id,
362                 rd->name,
363                 value, 
364                 
365                 (uint32_t)before
366         );
367         
368         return 1;
369     }
370     return 0;
371 }
372
373 static inline int process_json_response(BUFFER *b) {
374     return discard_response(b, "json");
375 }
376
377
378 // ----------------------------------------------------------------------------
379 // the backend thread
380
381 void *backends_main(void *ptr) {
382     int default_port = 0;
383     int sock = -1;
384     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
385
386     BUFFER *b = buffer_create(1), *response = buffer_create(1);
387     int (*backend_request_formatter)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, uint32_t) = NULL;
388     int (*backend_response_checker)(BUFFER *) = NULL;
389
390     info("BACKEND thread created with task id %d", gettid());
391
392     if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
393         error("Cannot set pthread cancel type to DEFERRED.");
394
395     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
396         error("Cannot set pthread cancel state to ENABLE.");
397
398     // ------------------------------------------------------------------------
399     // collect configuration options
400
401     struct timeval timeout = {
402             .tv_sec = 0,
403             .tv_usec = 0
404     };
405     uint32_t options;
406     int enabled             = config_get_boolean(CONFIG_SECTION_BACKEND, "enabled", 0);
407     const char *source      = config_get(CONFIG_SECTION_BACKEND, "data source", "average");
408     const char *type        = config_get(CONFIG_SECTION_BACKEND, "type", "graphite");
409     const char *destination = config_get(CONFIG_SECTION_BACKEND, "destination", "localhost");
410     const char *prefix      = config_get(CONFIG_SECTION_BACKEND, "prefix", "netdata");
411     const char *hostname    = config_get(CONFIG_SECTION_BACKEND, "hostname", localhost->hostname);
412     int frequency           = (int)config_get_number(CONFIG_SECTION_BACKEND, "update every", 10);
413     int buffer_on_failures  = (int)config_get_number(CONFIG_SECTION_BACKEND, "buffer on failures", 10);
414     long timeoutms          = config_get_number(CONFIG_SECTION_BACKEND, "timeout ms", frequency * 2 * 1000);
415
416     // ------------------------------------------------------------------------
417     // validate configuration options
418     // and prepare for sending data to our backend
419
420     if(!enabled || frequency < 1)
421         goto cleanup;
422
423     if(!strcmp(source, "as collected")) {
424         options = BACKEND_SOURCE_DATA_AS_COLLECTED;
425     }
426     else if(!strcmp(source, "average")) {
427         options = BACKEND_SOURCE_DATA_AVERAGE;
428     }
429     else if(!strcmp(source, "sum") || !strcmp(source, "volume")) {
430         options = BACKEND_SOURCE_DATA_SUM;
431     }
432     else {
433         error("Invalid data source method '%s' for backend given. Disabling backed.", source);
434         goto cleanup;
435     }
436
437     if(timeoutms < 1) {
438         error("BACKED invalid timeout %ld ms given. Assuming %d ms.", timeoutms, frequency * 2 * 1000);
439         timeoutms = frequency * 2 * 1000;
440     }
441     timeout.tv_sec  = (timeoutms * 1000) / 1000000;
442     timeout.tv_usec = (timeoutms * 1000) % 1000000;
443
444
445     // ------------------------------------------------------------------------
446     // select the backend type
447
448     if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
449
450         default_port = 2003;
451         backend_response_checker = process_graphite_response;
452
453         if(options == BACKEND_SOURCE_DATA_AS_COLLECTED)
454             backend_request_formatter = format_dimension_collected_graphite_plaintext;
455         else
456             backend_request_formatter = format_dimension_stored_graphite_plaintext;
457
458     }
459     else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
460
461         default_port = 4242;
462         backend_response_checker = process_opentsdb_response;
463
464         if(options == BACKEND_SOURCE_DATA_AS_COLLECTED)
465             backend_request_formatter = format_dimension_collected_opentsdb_telnet;
466         else
467             backend_request_formatter = format_dimension_stored_opentsdb_telnet;
468
469     }
470     else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) {
471
472         default_port = 5448;
473         backend_response_checker = process_json_response;
474
475         if (options == BACKEND_SOURCE_DATA_AS_COLLECTED)
476             backend_request_formatter = format_dimension_collected_json_plaintext;
477         else
478             backend_request_formatter = format_dimension_stored_json_plaintext;
479
480     }
481     else {
482         error("Unknown backend type '%s'", type);
483         goto cleanup;
484     }
485
486     if(backend_request_formatter == NULL || backend_response_checker == NULL) {
487         error("backend is misconfigured - disabling it.");
488         goto cleanup;
489     }
490
491
492     // ------------------------------------------------------------------------
493     // prepare the charts for monitoring the backend operation
494
495     struct rusage thread;
496
497     collected_number
498             chart_buffered_metrics = 0,
499             chart_lost_metrics = 0,
500             chart_sent_metrics = 0,
501             chart_buffered_bytes = 0,
502             chart_received_bytes = 0,
503             chart_sent_bytes = 0,
504             chart_receptions = 0,
505             chart_transmission_successes = 0,
506             chart_transmission_failures = 0,
507             chart_data_lost_events = 0,
508             chart_lost_bytes = 0,
509             chart_backend_reconnects = 0,
510             chart_backend_latency = 0;
511
512     RRDSET *chart_metrics = rrdset_create_localhost("netdata", "backend_metrics", NULL, "backend", NULL, "Netdata Buffered Metrics", "metrics", 130600, frequency, RRDSET_TYPE_LINE);
513     rrddim_add(chart_metrics, "buffered", NULL,  1, 1, RRD_ALGORITHM_ABSOLUTE);
514     rrddim_add(chart_metrics, "lost",     NULL,  1, 1, RRD_ALGORITHM_ABSOLUTE);
515     rrddim_add(chart_metrics, "sent",     NULL,  1, 1, RRD_ALGORITHM_ABSOLUTE);
516
517     RRDSET *chart_bytes = rrdset_create_localhost("netdata", "backend_bytes", NULL, "backend", NULL, "Netdata Backend Data Size", "KB", 130610, frequency, RRDSET_TYPE_AREA);
518     rrddim_add(chart_bytes, "buffered", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
519     rrddim_add(chart_bytes, "lost",     NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
520     rrddim_add(chart_bytes, "sent",     NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
521     rrddim_add(chart_bytes, "received", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
522
523     RRDSET *chart_ops = rrdset_create_localhost("netdata", "backend_ops", NULL, "backend", NULL, "Netdata Backend Operations", "operations", 130630, frequency, RRDSET_TYPE_LINE);
524     rrddim_add(chart_ops, "write",     NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
525     rrddim_add(chart_ops, "discard",   NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
526     rrddim_add(chart_ops, "reconnect", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
527     rrddim_add(chart_ops, "failure",   NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
528     rrddim_add(chart_ops, "read",      NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
529
530     /*
531      * this is misleading - we can only measure the time we need to send data
532      * this time is not related to the time required for the data to travel to
533      * the backend database and the time that server needed to process them
534      *
535      * issue #1432 and https://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html
536      *
537     RRDSET *chart_latency = rrdset_create_localhost("netdata", "backend_latency", NULL, "backend", NULL, "Netdata Backend Latency", "ms", 130620, frequency, RRDSET_TYPE_AREA);
538     rrddim_add(chart_latency, "latency",   NULL,  1, 1000, RRD_ALGORITHM_ABSOLUTE);
539     */
540
541     RRDSET *chart_rusage = rrdset_create_localhost("netdata", "backend_thread_cpu", NULL, "backend", NULL, "NetData Backend Thread CPU usage", "milliseconds/s", 130630, frequency, RRDSET_TYPE_STACKED);
542     rrddim_add(chart_rusage, "user",   NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
543     rrddim_add(chart_rusage, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
544
545
546     // ------------------------------------------------------------------------
547     // prepare the backend main loop
548
549     info("BACKEND configured ('%s' on '%s' sending '%s' data, every %d seconds, as host '%s', with prefix '%s')", type, destination, source, frequency, hostname, prefix);
550
551     usec_t step_ut = frequency * USEC_PER_SEC;
552     time_t after = now_realtime_sec();
553     int failures = 0;
554     heartbeat_t hb;
555     heartbeat_init(&hb);
556
557     for(;;) {
558
559         // ------------------------------------------------------------------------
560         // Wait for the next iteration point.
561         heartbeat_next(&hb, step_ut);
562         time_t before = now_realtime_sec();
563
564
565         // ------------------------------------------------------------------------
566         // add to the buffer the data we need to send to the backend
567
568         int pthreadoldcancelstate;
569
570         if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &pthreadoldcancelstate) != 0))
571             error("Cannot set pthread cancel state to DISABLE.");
572
573         rrd_rdlock();
574         RRDHOST *host;
575         rrdhost_foreach_read(host) {
576             if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE)
577                 continue;
578
579             rrdhost_rdlock(host);
580
581             RRDSET *st;
582             rrdset_foreach_read(st, host) {
583                 rrdset_rdlock(st);
584
585                 RRDDIM *rd;
586                 rrddim_foreach_read(rd, st) {
587                     if(rd->last_collected_time.tv_sec >= after)
588                         chart_buffered_metrics += backend_request_formatter(b, prefix, host, (host == localhost)?hostname:host->hostname, st, rd, after, before, options);
589                 }
590                 rrdset_unlock(st);
591             }
592             rrdhost_unlock(host);
593         }
594         rrd_unlock();
595
596         if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
597             error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
598
599         // ------------------------------------------------------------------------
600
601         chart_buffered_bytes = (collected_number)buffer_strlen(b);
602
603         // reset the monitoring chart counters
604         chart_received_bytes =
605         chart_sent_bytes =
606         chart_sent_metrics =
607         chart_lost_metrics =
608         chart_transmission_successes =
609         chart_transmission_failures =
610         chart_data_lost_events =
611         chart_lost_bytes =
612         chart_backend_reconnects =
613         chart_backend_latency = 0;
614
615         if(unlikely(netdata_exit)) break;
616
617         //fprintf(stderr, "\nBACKEND BEGIN:\n%s\nBACKEND END\n", buffer_tostring(b)); // FIXME
618         //fprintf(stderr, "after = %lu, before = %lu\n", after, before);
619
620         // prepare for the next iteration
621         // to add incrementally data to buffer
622         after = before;
623
624         // ------------------------------------------------------------------------
625         // if we are connected, receive a response, without blocking
626
627         if(likely(sock != -1)) {
628             errno = 0;
629
630             // loop through to collect all data
631             while(sock != -1 && errno != EWOULDBLOCK) {
632                 buffer_need_bytes(response, 4096);
633
634                 ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
635                 if(likely(r > 0)) {
636                     // we received some data
637                     response->len += r;
638                     chart_received_bytes += r;
639                     chart_receptions++;
640                 }
641                 else if(r == 0) {
642                     error("Backend '%s' closed the socket", destination);
643                     close(sock);
644                     sock = -1;
645                 }
646                 else {
647                     // failed to receive data
648                     if(errno != EAGAIN && errno != EWOULDBLOCK) {
649                         error("Cannot receive data from backend '%s'.", destination);
650                     }
651                 }
652             }
653
654             // if we received data, process them
655             if(buffer_strlen(response))
656                 backend_response_checker(response);
657         }
658
659         // ------------------------------------------------------------------------
660         // if we are not connected, connect to a backend server
661
662         if(unlikely(sock == -1)) {
663             usec_t start_ut = now_monotonic_usec();
664             size_t reconnects = 0;
665
666             sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
667
668             chart_backend_reconnects += reconnects;
669             chart_backend_latency += now_monotonic_usec() - start_ut;
670         }
671
672         if(unlikely(netdata_exit)) break;
673
674         // ------------------------------------------------------------------------
675         // if we are connected, send our buffer to the backend server
676
677         if(likely(sock != -1)) {
678             size_t len = buffer_strlen(b);
679             usec_t start_ut = now_monotonic_usec();
680             int flags = 0;
681 #ifdef MSG_NOSIGNAL
682             flags += MSG_NOSIGNAL;
683 #endif
684
685             ssize_t written = send(sock, buffer_tostring(b), len, flags);
686             chart_backend_latency += now_monotonic_usec() - start_ut;
687             if(written != -1 && (size_t)written == len) {
688                 // we sent the data successfully
689                 chart_transmission_successes++;
690                 chart_sent_bytes += written;
691                 chart_sent_metrics = chart_buffered_metrics;
692
693                 // reset the failures count
694                 failures = 0;
695
696                 // empty the buffer
697                 buffer_flush(b);
698             }
699             else {
700                 // oops! we couldn't send (all or some of the) data
701                 error("Failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", destination, len, written);
702                 chart_transmission_failures++;
703
704                 if(written != -1)
705                     chart_sent_bytes += written;
706
707                 // increment the counter we check for data loss
708                 failures++;
709
710                 // close the socket - we will re-open it next time
711                 close(sock);
712                 sock = -1;
713             }
714         }
715         else {
716             error("Failed to update database backend '%s'", destination);
717             chart_transmission_failures++;
718
719             // increment the counter we check for data loss
720             failures++;
721         }
722
723         if(failures > buffer_on_failures) {
724             // too bad! we are going to lose data
725             chart_lost_bytes += buffer_strlen(b);
726             error("Reached %d backend failures. Flushing buffers to protect this host - this results in data loss on back-end server '%s'", failures, destination);
727             buffer_flush(b);
728             failures = 0;
729             chart_data_lost_events++;
730             chart_lost_metrics = chart_buffered_metrics;
731         }
732
733         if(unlikely(netdata_exit)) break;
734
735         // ------------------------------------------------------------------------
736         // update the monitoring charts
737
738         if(likely(chart_ops->counter_done)) rrdset_next(chart_ops);
739         rrddim_set(chart_ops, "read",         chart_receptions);
740         rrddim_set(chart_ops, "write",        chart_transmission_successes);
741         rrddim_set(chart_ops, "discard",      chart_data_lost_events);
742         rrddim_set(chart_ops, "failure",      chart_transmission_failures);
743         rrddim_set(chart_ops, "reconnect",    chart_backend_reconnects);
744         rrdset_done(chart_ops);
745
746         if(likely(chart_metrics->counter_done)) rrdset_next(chart_metrics);
747         rrddim_set(chart_metrics, "buffered", chart_buffered_metrics);
748         rrddim_set(chart_metrics, "lost",     chart_lost_metrics);
749         rrddim_set(chart_metrics, "sent",     chart_sent_metrics);
750         rrdset_done(chart_metrics);
751
752         if(likely(chart_bytes->counter_done)) rrdset_next(chart_bytes);
753         rrddim_set(chart_bytes, "buffered",   chart_buffered_bytes);
754         rrddim_set(chart_bytes, "lost",       chart_lost_bytes);
755         rrddim_set(chart_bytes, "sent",       chart_sent_bytes);
756         rrddim_set(chart_bytes, "received",   chart_received_bytes);
757         rrdset_done(chart_bytes);
758
759         /*
760         if(likely(chart_latency->counter_done)) rrdset_next(chart_latency);
761         rrddim_set(chart_latency, "latency",  chart_backend_latency);
762         rrdset_done(chart_latency);
763         */
764
765         getrusage(RUSAGE_THREAD, &thread);
766         if(likely(chart_rusage->counter_done)) rrdset_next(chart_rusage);
767         rrddim_set(chart_rusage, "user",   thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec);
768         rrddim_set(chart_rusage, "system", thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec);
769         rrdset_done(chart_rusage);
770
771         if(likely(buffer_strlen(b) == 0))
772             chart_buffered_metrics = 0;
773
774         if(unlikely(netdata_exit)) break;
775     }
776
777 cleanup:
778     if(sock != -1)
779         close(sock);
780
781     buffer_free(b);
782     buffer_free(response);
783
784     info("BACKEND thread exiting");
785
786     static_thread->enabled = 0;
787     pthread_exit(NULL);
788     return NULL;
789 }