]> arthur.barton.de Git - netdata.git/blob - src/backends.c
a2c25e94bdb70cbc758d485a3de682451a744a02
[netdata.git] / src / backends.c
1 #include "common.h"
2
3 // ----------------------------------------------------------------------------
4 // How backends work in netdata:
5 //
6 // 1. There is an independent thread that runs at the required interval
7 //    (for example, once every 10 seconds)
8 //
9 // 2. Every time it wakes, it calls the backend formatting functions to build
10 //    a buffer of data. This is a very fast, memory only operation.
11 //
12 // 3. If the buffer already includes data, the new data are appended.
13 //    If the buffer becomes too big, because the data cannot be sent, a
14 //    log is written and the buffer is discarded.
15 //
16 // 4. Then it tries to send all the data. It blocks until all the data are sent
17 //    or the socket returns an error.
18 //    If the time required for this is above the interval, it starts skipping
19 //    intervals, but the calculated values include the entire database, without
20 //    gaps (it remembers the timestamps and continues from where it stopped).
21 //
22 // 5. repeats the above forever.
23 //
24
25 #define BACKEND_SOURCE_DATA_AS_COLLECTED 0x00000001
26 #define BACKEND_SOURCE_DATA_AVERAGE      0x00000002
27 #define BACKEND_SOURCE_DATA_SUM          0x00000004
28
29
30 // ----------------------------------------------------------------------------
31 // helper functions for backends
32
33 // calculate the SUM or AVERAGE of a dimension, for any timeframe
34 // may return NAN if the database does not have any value in the give timeframe
35
36 static inline calculated_number backend_calculate_value_from_stored_data(
37           RRDSET *st                // the chart
38         , RRDDIM *rd                // the dimension
39         , time_t after              // the start timestamp
40         , time_t before             // the end timestamp
41         , uint32_t options          // BACKEND_SOURCE_* bitmap
42 ) {
43     // find the edges of the rrd database for this chart
44     time_t first_t = rrdset_first_entry_t(st);
45     time_t last_t  = rrdset_last_entry_t(st);
46
47     if(unlikely(before < first_t || after > last_t))
48         // the chart has not been updated in the wanted timeframe
49         return NAN;
50
51     // align the time-frame
52     // for 'after' also skip the first value by adding st->update_every
53     after  = after  - after  % st->update_every + st->update_every;
54     before = before - before % st->update_every;
55
56     if(unlikely(after < first_t))
57         after = first_t;
58
59     if(unlikely(after > before))
60         // this can happen when st->update_every > before - after
61         before = after;
62
63     if(unlikely(before > last_t))
64         before = last_t;
65
66     size_t counter = 0;
67     calculated_number sum = 0;
68
69     long    start_at_slot = rrdset_time2slot(st, before),
70             stop_at_slot  = rrdset_time2slot(st, after),
71             slot, stop_now = 0;
72
73     for(slot = start_at_slot; !stop_now ; slot--) {
74
75         if(unlikely(slot < 0)) slot = st->entries - 1;
76         if(unlikely(slot == stop_at_slot)) stop_now = 1;
77
78         storage_number n = rd->values[slot];
79
80         if(unlikely(!does_storage_number_exist(n))) {
81             // not collected
82             continue;
83         }
84
85         calculated_number value = unpack_storage_number(n);
86         sum += value;
87
88         counter++;
89     }
90
91     if(unlikely(!counter))
92         return NAN;
93
94     if(unlikely(options & BACKEND_SOURCE_DATA_SUM))
95         return sum;
96
97     return sum / (calculated_number)counter;
98 }
99
100
101 // discard a response received by a backend
102 // after logging a simple of it to error.log
103
104 static inline int discard_response(BUFFER *b, const char *backend) {
105     char sample[1024];
106     const char *s = buffer_tostring(b);
107     char *d = sample, *e = &sample[sizeof(sample) - 1];
108
109     for(; *s && d < e ;s++) {
110         char c = *s;
111         if(unlikely(!isprint(c))) c = ' ';
112         *d++ = c;
113     }
114     *d = '\0';
115
116     info("Received %zu bytes from %s backend. Ignoring them. Sample: '%s'", buffer_strlen(b), backend, sample);
117     buffer_flush(b);
118     return 0;
119 }
120
121
122 // ----------------------------------------------------------------------------
123 // graphite backend
124
125 static inline int format_dimension_collected_graphite_plaintext(
126           BUFFER *b                 // the buffer to write data to
127         , const char *prefix        // the prefix to use
128         , RRDHOST *host             // the host this chart comes from
129         , const char *hostname      // the hostname (to override host->hostname)
130         , RRDSET *st                // the chart
131         , RRDDIM *rd                // the dimension
132         , time_t after              // the start timestamp
133         , time_t before             // the end timestamp
134         , uint32_t options          // BACKEND_SOURCE_* bitmap
135 ) {
136     (void)host;
137     (void)after;
138     (void)before;
139     (void)options;
140
141     buffer_sprintf(
142             b
143             , "%s.%s.%s.%s " COLLECTED_NUMBER_FORMAT " %u\n"
144             , prefix
145             , hostname
146             , st->id
147             , rd->id
148             , rd->last_collected_value
149             , (uint32_t)rd->last_collected_time.tv_sec
150     );
151
152     return 1;
153 }
154
155 static inline int format_dimension_stored_graphite_plaintext(
156           BUFFER *b                 // the buffer to write data to
157         , const char *prefix        // the prefix to use
158         , RRDHOST *host             // the host this chart comes from
159         , const char *hostname      // the hostname (to override host->hostname)
160         , RRDSET *st                // the chart
161         , RRDDIM *rd                // the dimension
162         , time_t after              // the start timestamp
163         , time_t before             // the end timestamp
164         , uint32_t options          // BACKEND_SOURCE_* bitmap
165 ) {
166     (void)host;
167
168     calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options);
169
170     if(!isnan(value)) {
171
172         buffer_sprintf(
173                 b
174                 , "%s.%s.%s.%s " CALCULATED_NUMBER_FORMAT " %u\n"
175                 , prefix
176                 , hostname
177                 , st->id
178                 , rd->id
179                 , value
180                 , (uint32_t) before
181         );
182
183         return 1;
184     }
185     return 0;
186 }
187
188 static inline int process_graphite_response(BUFFER *b) {
189     return discard_response(b, "graphite");
190 }
191
192
193 // ----------------------------------------------------------------------------
194 // opentsdb backend
195
196 static inline int format_dimension_collected_opentsdb_telnet(
197           BUFFER *b                 // the buffer to write data to
198         , const char *prefix        // the prefix to use
199         , RRDHOST *host             // the host this chart comes from
200         , const char *hostname      // the hostname (to override host->hostname)
201         , RRDSET *st                // the chart
202         , RRDDIM *rd                // the dimension
203         , time_t after              // the start timestamp
204         , time_t before             // the end timestamp
205         , uint32_t options          // BACKEND_SOURCE_* bitmap
206 ) {
207     (void)host;
208     (void)after;
209     (void)before;
210     (void)options;
211
212     buffer_sprintf(
213             b
214             , "put %s.%s.%s %u " COLLECTED_NUMBER_FORMAT " host=%s\n"
215             , prefix
216             , st->id
217             , rd->id
218             , (uint32_t)rd->last_collected_time.tv_sec
219             , rd->last_collected_value
220             , hostname
221     );
222
223     return 1;
224 }
225
226 static inline int format_dimension_stored_opentsdb_telnet(
227           BUFFER *b                 // the buffer to write data to
228         , const char *prefix        // the prefix to use
229         , RRDHOST *host             // the host this chart comes from
230         , const char *hostname      // the hostname (to override host->hostname)
231         , RRDSET *st                // the chart
232         , RRDDIM *rd                // the dimension
233         , time_t after              // the start timestamp
234         , time_t before             // the end timestamp
235         , uint32_t options          // BACKEND_SOURCE_* bitmap
236 ) {
237     (void)host;
238
239     calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options);
240
241     if(!isnan(value)) {
242
243         buffer_sprintf(
244                 b
245                 , "put %s.%s.%s %u " CALCULATED_NUMBER_FORMAT " host=%s\n"
246                 , prefix
247                 , st->id
248                 , rd->id
249                 , (uint32_t) before
250                 , value
251                 , hostname
252         );
253
254         return 1;
255     }
256     return 0;
257 }
258
259 static inline int process_opentsdb_response(BUFFER *b) {
260     return discard_response(b, "opentsdb");
261 }
262
263
264 // ----------------------------------------------------------------------------
265 // json backend
266
267 static inline int format_dimension_collected_json_plaintext(
268           BUFFER *b                 // the buffer to write data to
269         , const char *prefix        // the prefix to use
270         , RRDHOST *host             // the host this chart comes from
271         , const char *hostname      // the hostname (to override host->hostname)
272         , RRDSET *st                // the chart
273         , RRDDIM *rd                // the dimension
274         , time_t after              // the start timestamp
275         , time_t before             // the end timestamp
276         , uint32_t options          // BACKEND_SOURCE_* bitmap
277 ) {
278     (void)host;
279     (void)after;
280     (void)before;
281     (void)options;
282
283     buffer_sprintf(b, "{"
284         "\"prefix\":\"%s\","
285         "\"hostname\":\"%s\","
286
287         "\"chart_id\":\"%s\","
288         "\"chart_name\":\"%s\","
289         "\"chart_family\":\"%s\","
290         "\"chart_context\": \"%s\","
291         "\"chart_type\":\"%s\","
292         "\"units\": \"%s\","
293
294         "\"id\":\"%s\","
295         "\"name\":\"%s\","
296         "\"value\":" COLLECTED_NUMBER_FORMAT ","
297
298         "\"timestamp\": %u}\n", 
299             prefix,
300             hostname,
301
302             st->id,
303             st->name,
304             st->family,
305             st->context,
306             st->type,
307             st->units,
308
309             rd->id,
310             rd->name,
311             rd->last_collected_value,
312
313             (uint32_t)rd->last_collected_time.tv_sec
314     );
315
316     return 1;
317 }
318
319 static inline int format_dimension_stored_json_plaintext(
320           BUFFER *b                 // the buffer to write data to
321         , const char *prefix        // the prefix to use
322         , RRDHOST *host             // the host this chart comes from
323         , const char *hostname      // the hostname (to override host->hostname)
324         , RRDSET *st                // the chart
325         , RRDDIM *rd                // the dimension
326         , time_t after              // the start timestamp
327         , time_t before             // the end timestamp
328         , uint32_t options          // BACKEND_SOURCE_* bitmap
329 ) {
330     (void)host;
331
332     calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options);
333
334     if(!isnan(value)) {
335         buffer_sprintf(b, "{"
336             "\"prefix\":\"%s\","
337             "\"hostname\":\"%s\","
338
339             "\"chart_id\":\"%s\","
340             "\"chart_name\":\"%s\","
341             "\"chart_family\":\"%s\","
342             "\"chart_context\": \"%s\","
343             "\"chart_type\":\"%s\","
344             "\"units\": \"%s\","
345
346             "\"id\":\"%s\","
347             "\"name\":\"%s\","
348             "\"value\":" CALCULATED_NUMBER_FORMAT ","
349
350             "\"timestamp\": %u}\n", 
351                 prefix,
352                 hostname,
353                 
354                 st->id,
355                 st->name,
356                 st->family,
357                 st->context,
358                 st->type,
359                 st->units,
360
361                 rd->id,
362                 rd->name,
363                 value, 
364                 
365                 (uint32_t)before
366         );
367         
368         return 1;
369     }
370     return 0;
371 }
372
373 static inline int process_json_response(BUFFER *b) {
374     return discard_response(b, "json");
375 }
376
377
378 // ----------------------------------------------------------------------------
379 // the backend thread
380
381 void *backends_main(void *ptr) {
382     int default_port = 0;
383     int sock = -1;
384     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
385
386     BUFFER *b = buffer_create(1), *response = buffer_create(1);
387     int (*backend_request_formatter)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, uint32_t) = NULL;
388     int (*backend_response_checker)(BUFFER *) = NULL;
389
390     info("BACKEND thread created with task id %d", gettid());
391
392     if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
393         error("Cannot set pthread cancel type to DEFERRED.");
394
395     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
396         error("Cannot set pthread cancel state to ENABLE.");
397
398     // ------------------------------------------------------------------------
399     // collect configuration options
400
401     struct timeval timeout = {
402             .tv_sec = 0,
403             .tv_usec = 0
404     };
405     uint32_t options;
406     int enabled             = config_get_boolean(CONFIG_SECTION_BACKEND, "enabled", 0);
407     const char *source      = config_get(CONFIG_SECTION_BACKEND, "data source", "average");
408     const char *type        = config_get(CONFIG_SECTION_BACKEND, "type", "graphite");
409     const char *destination = config_get(CONFIG_SECTION_BACKEND, "destination", "localhost");
410     const char *prefix      = config_get(CONFIG_SECTION_BACKEND, "prefix", "netdata");
411     const char *hostname    = config_get(CONFIG_SECTION_BACKEND, "hostname", localhost->hostname);
412     int frequency           = (int)config_get_number(CONFIG_SECTION_BACKEND, "update every", 10);
413     int buffer_on_failures  = (int)config_get_number(CONFIG_SECTION_BACKEND, "buffer on failures", 10);
414     long timeoutms          = config_get_number(CONFIG_SECTION_BACKEND, "timeout ms", frequency * 2 * 1000);
415
416     // ------------------------------------------------------------------------
417     // validate configuration options
418     // and prepare for sending data to our backend
419
420     if(!enabled || frequency < 1)
421         goto cleanup;
422
423     if(!strcmp(source, "as collected")) {
424         options = BACKEND_SOURCE_DATA_AS_COLLECTED;
425     }
426     else if(!strcmp(source, "average")) {
427         options = BACKEND_SOURCE_DATA_AVERAGE;
428     }
429     else if(!strcmp(source, "sum") || !strcmp(source, "volume")) {
430         options = BACKEND_SOURCE_DATA_SUM;
431     }
432     else {
433         error("Invalid data source method '%s' for backend given. Disabling backed.", source);
434         goto cleanup;
435     }
436
437     if(timeoutms < 1) {
438         error("BACKED invalid timeout %ld ms given. Assuming %d ms.", timeoutms, frequency * 2 * 1000);
439         timeoutms = frequency * 2 * 1000;
440     }
441     timeout.tv_sec  = (timeoutms * 1000) / 1000000;
442     timeout.tv_usec = (timeoutms * 1000) % 1000000;
443
444
445     // ------------------------------------------------------------------------
446     // select the backend type
447
448     if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
449
450         default_port = 2003;
451         backend_response_checker = process_graphite_response;
452
453         if(options == BACKEND_SOURCE_DATA_AS_COLLECTED)
454             backend_request_formatter = format_dimension_collected_graphite_plaintext;
455         else
456             backend_request_formatter = format_dimension_stored_graphite_plaintext;
457
458     }
459     else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
460
461         default_port = 4242;
462         backend_response_checker = process_opentsdb_response;
463
464         if(options == BACKEND_SOURCE_DATA_AS_COLLECTED)
465             backend_request_formatter = format_dimension_collected_opentsdb_telnet;
466         else
467             backend_request_formatter = format_dimension_stored_opentsdb_telnet;
468
469     }
470     else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext")) {
471
472         default_port = 5448;
473         backend_response_checker = process_json_response;
474
475         if (options == BACKEND_SOURCE_DATA_AS_COLLECTED)
476             backend_request_formatter = format_dimension_collected_json_plaintext;
477         else
478             backend_request_formatter = format_dimension_stored_json_plaintext;
479
480     }
481     else {
482         error("Unknown backend type '%s'", type);
483         goto cleanup;
484     }
485
486     if(backend_request_formatter == NULL || backend_response_checker == NULL) {
487         error("backend is misconfigured - disabling it.");
488         goto cleanup;
489     }
490
491
492     // ------------------------------------------------------------------------
493     // prepare the charts for monitoring the backend operation
494
495     struct rusage thread;
496
497     collected_number
498             chart_buffered_metrics = 0,
499             chart_lost_metrics = 0,
500             chart_sent_metrics = 0,
501             chart_buffered_bytes = 0,
502             chart_received_bytes = 0,
503             chart_sent_bytes = 0,
504             chart_receptions = 0,
505             chart_transmission_successes = 0,
506             chart_transmission_failures = 0,
507             chart_data_lost_events = 0,
508             chart_lost_bytes = 0,
509             chart_backend_reconnects = 0,
510             chart_backend_latency = 0;
511
512     RRDSET *chart_metrics = rrdset_create_localhost("netdata", "backend_metrics", NULL, "backend", NULL, "Netdata Buffered Metrics", "metrics", 130600, frequency, RRDSET_TYPE_LINE);
513     rrddim_add(chart_metrics, "buffered", NULL,  1, 1, RRD_ALGORITHM_ABSOLUTE);
514     rrddim_add(chart_metrics, "lost",     NULL,  1, 1, RRD_ALGORITHM_ABSOLUTE);
515     rrddim_add(chart_metrics, "sent",     NULL,  1, 1, RRD_ALGORITHM_ABSOLUTE);
516
517     RRDSET *chart_bytes = rrdset_create_localhost("netdata", "backend_bytes", NULL, "backend", NULL, "Netdata Backend Data Size", "KB", 130610, frequency, RRDSET_TYPE_AREA);
518     rrddim_add(chart_bytes, "buffered", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
519     rrddim_add(chart_bytes, "lost",     NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
520     rrddim_add(chart_bytes, "sent",     NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
521     rrddim_add(chart_bytes, "received", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
522
523     RRDSET *chart_ops = rrdset_create_localhost("netdata", "backend_ops", NULL, "backend", NULL, "Netdata Backend Operations", "operations", 130630, frequency, RRDSET_TYPE_LINE);
524     rrddim_add(chart_ops, "write",     NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
525     rrddim_add(chart_ops, "discard",   NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
526     rrddim_add(chart_ops, "reconnect", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
527     rrddim_add(chart_ops, "failure",   NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
528     rrddim_add(chart_ops, "read",      NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
529
530     /*
531      * this is misleading - we can only measure the time we need to send data
532      * this time is not related to the time required for the data to travel to
533      * the backend database and the time that server needed to process them
534      *
535      * issue #1432 and https://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html
536      *
537     RRDSET *chart_latency = rrdset_create_localhost("netdata", "backend_latency", NULL, "backend", NULL, "Netdata Backend Latency", "ms", 130620, frequency, RRDSET_TYPE_AREA);
538     rrddim_add(chart_latency, "latency",   NULL,  1, 1000, RRD_ALGORITHM_ABSOLUTE);
539     */
540
541     RRDSET *chart_rusage = rrdset_create_localhost("netdata", "backend_thread_cpu", NULL, "backend", NULL, "NetData Backend Thread CPU usage", "milliseconds/s", 130630, frequency, RRDSET_TYPE_STACKED);
542     rrddim_add(chart_rusage, "user",   NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
543     rrddim_add(chart_rusage, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
544
545
546     // ------------------------------------------------------------------------
547     // prepare the backend main loop
548
549     info("BACKEND configured ('%s' on '%s' sending '%s' data, every %d seconds, as host '%s', with prefix '%s')", type, destination, source, frequency, hostname, prefix);
550
551     usec_t step_ut = frequency * USEC_PER_SEC;
552     time_t after = now_realtime_sec();
553     int failures = 0;
554     heartbeat_t hb;
555     heartbeat_init(&hb);
556
557     for(;;) {
558
559         // ------------------------------------------------------------------------
560         // Wait for the next iteration point.
561         heartbeat_next(&hb, step_ut);
562         time_t before = now_realtime_sec();
563
564
565         // ------------------------------------------------------------------------
566         // add to the buffer the data we need to send to the backend
567
568         int pthreadoldcancelstate;
569
570         if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &pthreadoldcancelstate) != 0))
571             error("Cannot set pthread cancel state to DISABLE.");
572
573         size_t count_hosts = 0;
574         size_t count_charts_total = 0;
575         size_t count_dims_total = 0;
576
577         rrd_rdlock();
578         RRDHOST *host;
579         rrdhost_foreach_read(host) {
580             if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE) {
581                 debug(D_BACKEND, "BACKEND: not sending host '%s' because its memory mode is '%s'", host->hostname, rrd_memory_mode_name(host->rrd_memory_mode));
582                 continue;
583             }
584
585             rrdhost_rdlock(host);
586
587             count_hosts++;
588             size_t count_charts = 0;
589             size_t count_dims = 0;
590             size_t count_dims_skipped = 0;
591
592             const char *__hostname = (host == localhost)?hostname:host->hostname;
593
594             RRDSET *st;
595             rrdset_foreach_read(st, host) {
596                 rrdset_rdlock(st);
597
598                 count_charts++;
599
600                 RRDDIM *rd;
601                 rrddim_foreach_read(rd, st) {
602                     if(likely(rd->last_collected_time.tv_sec >= after)) {
603                         chart_buffered_metrics += backend_request_formatter(b, prefix, host, __hostname, st, rd, after, before, options);
604                         count_dims++;
605                     }
606                     else {
607                         debug(D_BACKEND, "BACKEND: not sending dimension '%s' of chart '%s' from host '%s', its last data collection is not within our timeframe", rd->id, st->id, __hostname);
608                         count_dims_skipped++;
609                     }
610                 }
611                 rrdset_unlock(st);
612             }
613
614             debug(D_BACKEND, "BACKEND: sending host '%s', metrics of %zu dimensions, of %zu charts. Skipped %zu dimensions.", __hostname, count_dims, count_charts, count_dims_skipped);
615             count_charts_total += count_charts;
616             count_dims_total += count_dims;
617
618             rrdhost_unlock(host);
619         }
620         rrd_unlock();
621
622         debug(D_BACKEND, "BACKEND: buffer has %zu bytes, added metrics for %zu dimensions, of %zu charts, from %zu hosts", buffer_strlen(b), count_dims_total, count_charts_total, count_hosts);
623
624         if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
625             error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
626
627         // ------------------------------------------------------------------------
628
629         chart_buffered_bytes = (collected_number)buffer_strlen(b);
630
631         // reset the monitoring chart counters
632         chart_received_bytes =
633         chart_sent_bytes =
634         chart_sent_metrics =
635         chart_lost_metrics =
636         chart_transmission_successes =
637         chart_transmission_failures =
638         chart_data_lost_events =
639         chart_lost_bytes =
640         chart_backend_reconnects =
641         chart_backend_latency = 0;
642
643         if(unlikely(netdata_exit)) break;
644
645         //fprintf(stderr, "\nBACKEND BEGIN:\n%s\nBACKEND END\n", buffer_tostring(b)); // FIXME
646         //fprintf(stderr, "after = %lu, before = %lu\n", after, before);
647
648         // prepare for the next iteration
649         // to add incrementally data to buffer
650         after = before;
651
652         // ------------------------------------------------------------------------
653         // if we are connected, receive a response, without blocking
654
655         if(likely(sock != -1)) {
656             errno = 0;
657
658             // loop through to collect all data
659             while(sock != -1 && errno != EWOULDBLOCK) {
660                 buffer_need_bytes(response, 4096);
661
662                 ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
663                 if(likely(r > 0)) {
664                     // we received some data
665                     response->len += r;
666                     chart_received_bytes += r;
667                     chart_receptions++;
668                 }
669                 else if(r == 0) {
670                     error("Backend '%s' closed the socket", destination);
671                     close(sock);
672                     sock = -1;
673                 }
674                 else {
675                     // failed to receive data
676                     if(errno != EAGAIN && errno != EWOULDBLOCK) {
677                         error("Cannot receive data from backend '%s'.", destination);
678                     }
679                 }
680             }
681
682             // if we received data, process them
683             if(buffer_strlen(response))
684                 backend_response_checker(response);
685         }
686
687         // ------------------------------------------------------------------------
688         // if we are not connected, connect to a backend server
689
690         if(unlikely(sock == -1)) {
691             usec_t start_ut = now_monotonic_usec();
692             size_t reconnects = 0;
693
694             sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
695
696             chart_backend_reconnects += reconnects;
697             chart_backend_latency += now_monotonic_usec() - start_ut;
698         }
699
700         if(unlikely(netdata_exit)) break;
701
702         // ------------------------------------------------------------------------
703         // if we are connected, send our buffer to the backend server
704
705         if(likely(sock != -1)) {
706             size_t len = buffer_strlen(b);
707             usec_t start_ut = now_monotonic_usec();
708             int flags = 0;
709 #ifdef MSG_NOSIGNAL
710             flags += MSG_NOSIGNAL;
711 #endif
712
713             ssize_t written = send(sock, buffer_tostring(b), len, flags);
714             chart_backend_latency += now_monotonic_usec() - start_ut;
715             if(written != -1 && (size_t)written == len) {
716                 // we sent the data successfully
717                 chart_transmission_successes++;
718                 chart_sent_bytes += written;
719                 chart_sent_metrics = chart_buffered_metrics;
720
721                 // reset the failures count
722                 failures = 0;
723
724                 // empty the buffer
725                 buffer_flush(b);
726             }
727             else {
728                 // oops! we couldn't send (all or some of the) data
729                 error("Failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", destination, len, written);
730                 chart_transmission_failures++;
731
732                 if(written != -1)
733                     chart_sent_bytes += written;
734
735                 // increment the counter we check for data loss
736                 failures++;
737
738                 // close the socket - we will re-open it next time
739                 close(sock);
740                 sock = -1;
741             }
742         }
743         else {
744             error("Failed to update database backend '%s'", destination);
745             chart_transmission_failures++;
746
747             // increment the counter we check for data loss
748             failures++;
749         }
750
751         if(failures > buffer_on_failures) {
752             // too bad! we are going to lose data
753             chart_lost_bytes += buffer_strlen(b);
754             error("Reached %d backend failures. Flushing buffers to protect this host - this results in data loss on back-end server '%s'", failures, destination);
755             buffer_flush(b);
756             failures = 0;
757             chart_data_lost_events++;
758             chart_lost_metrics = chart_buffered_metrics;
759         }
760
761         if(unlikely(netdata_exit)) break;
762
763         // ------------------------------------------------------------------------
764         // update the monitoring charts
765
766         if(likely(chart_ops->counter_done)) rrdset_next(chart_ops);
767         rrddim_set(chart_ops, "read",         chart_receptions);
768         rrddim_set(chart_ops, "write",        chart_transmission_successes);
769         rrddim_set(chart_ops, "discard",      chart_data_lost_events);
770         rrddim_set(chart_ops, "failure",      chart_transmission_failures);
771         rrddim_set(chart_ops, "reconnect",    chart_backend_reconnects);
772         rrdset_done(chart_ops);
773
774         if(likely(chart_metrics->counter_done)) rrdset_next(chart_metrics);
775         rrddim_set(chart_metrics, "buffered", chart_buffered_metrics);
776         rrddim_set(chart_metrics, "lost",     chart_lost_metrics);
777         rrddim_set(chart_metrics, "sent",     chart_sent_metrics);
778         rrdset_done(chart_metrics);
779
780         if(likely(chart_bytes->counter_done)) rrdset_next(chart_bytes);
781         rrddim_set(chart_bytes, "buffered",   chart_buffered_bytes);
782         rrddim_set(chart_bytes, "lost",       chart_lost_bytes);
783         rrddim_set(chart_bytes, "sent",       chart_sent_bytes);
784         rrddim_set(chart_bytes, "received",   chart_received_bytes);
785         rrdset_done(chart_bytes);
786
787         /*
788         if(likely(chart_latency->counter_done)) rrdset_next(chart_latency);
789         rrddim_set(chart_latency, "latency",  chart_backend_latency);
790         rrdset_done(chart_latency);
791         */
792
793         getrusage(RUSAGE_THREAD, &thread);
794         if(likely(chart_rusage->counter_done)) rrdset_next(chart_rusage);
795         rrddim_set(chart_rusage, "user",   thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec);
796         rrddim_set(chart_rusage, "system", thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec);
797         rrdset_done(chart_rusage);
798
799         if(likely(buffer_strlen(b) == 0))
800             chart_buffered_metrics = 0;
801
802         if(unlikely(netdata_exit)) break;
803     }
804
805 cleanup:
806     if(sock != -1)
807         close(sock);
808
809     buffer_free(b);
810     buffer_free(response);
811
812     info("BACKEND thread exiting");
813
814     static_thread->enabled = 0;
815     pthread_exit(NULL);
816     return NULL;
817 }