]> arthur.barton.de Git - netdata.git/blob - src/backends.c
operational opentsdb backend
[netdata.git] / src / backends.c
1 #include "common.h"
2
3 int connect_to_socket4(const char *ip, int port) {
4     int sock;
5
6     debug(D_LISTENER, "IPv4 connecting to ip '%s' port %d", ip, port);
7
8     sock = socket(AF_INET, SOCK_STREAM, 0);
9     if(sock < 0) {
10         error("IPv4 socket() on ip '%s' port %d failed.", ip, port);
11         return -1;
12     }
13
14     struct sockaddr_in name;
15     memset(&name, 0, sizeof(struct sockaddr_in));
16     name.sin_family = AF_INET;
17     name.sin_port = htons(port);
18
19     int ret = inet_pton(AF_INET, ip, (void *)&name.sin_addr.s_addr);
20     if(ret != 1) {
21         error("Failed to convert '%s' to a valid IPv4 address.", ip);
22         close(sock);
23         return -1;
24     }
25
26     if(connect(sock, (struct sockaddr *) &name, sizeof(name)) < 0) {
27         close(sock);
28         error("IPv4 failed to connect to '%s', port %d", ip, port);
29         return -1;
30     }
31
32     debug(D_LISTENER, "Connected to IPv4 ip '%s' port %d", ip, port);
33     return sock;
34 }
35
36 int connect_to_socket6(const char *ip, int port) {
37     int sock = -1;
38     int ipv6only = 1;
39
40     debug(D_LISTENER, "IPv6 connecting to ip '%s' port %d", ip, port);
41
42     sock = socket(AF_INET6, SOCK_STREAM, 0);
43     if (sock < 0) {
44         error("IPv6 socket() on ip '%s' port %d failed.", ip, port);
45         return -1;
46     }
47
48     /* IPv6 only */
49     if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&ipv6only, sizeof(ipv6only)) != 0)
50         error("Cannot set IPV6_V6ONLY on ip '%s' port's %d.", ip, port);
51
52     struct sockaddr_in6 name;
53     memset(&name, 0, sizeof(struct sockaddr_in6));
54     name.sin6_family = AF_INET6;
55     name.sin6_port = htons ((uint16_t) port);
56
57     int ret = inet_pton(AF_INET6, ip, (void *)&name.sin6_addr.s6_addr);
58     if(ret != 1) {
59         error("Failed to convert IP '%s' to a valid IPv6 address.", ip);
60         close(sock);
61         return -1;
62     }
63
64     name.sin6_scope_id = 0;
65
66     if(connect(sock, (struct sockaddr *)&name, sizeof(name)) < 0) {
67         close(sock);
68         error("IPv6 failed to connect to '%s', port %d", ip, port);
69         return -1;
70     }
71
72     debug(D_LISTENER, "Connected to IPv6 ip '%s' port %d", ip, port);
73     return sock;
74 }
75
76
77 static inline int connect_to_one(const char *definition, int default_port) {
78     struct addrinfo hints;
79     struct addrinfo *result = NULL, *rp = NULL;
80
81     char buffer[strlen(definition) + 1];
82     strcpy(buffer, definition);
83
84     char buffer2[10 + 1];
85     snprintfz(buffer2, 10, "%d", default_port);
86
87     char *ip = buffer, *port = buffer2;
88
89     char *e = ip;
90     if(*e == '[') {
91         e = ++ip;
92         while(*e && *e != ']') e++;
93         if(*e == ']') {
94             *e = '\0';
95             e++;
96         }
97     }
98     else {
99         while(*e && *e != ':') e++;
100     }
101
102     if(*e == ':') {
103         port = e + 1;
104         *e = '\0';
105     }
106
107     if(!*ip)
108         return -1;
109
110     if(!*port)
111         port = buffer2;
112
113     memset(&hints, 0, sizeof(struct addrinfo));
114     hints.ai_family = AF_UNSPEC;    /* Allow IPv4 or IPv6 */
115     hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
116     hints.ai_flags = AI_PASSIVE;    /* For wildcard IP address */
117     hints.ai_protocol = 0;          /* Any protocol */
118     hints.ai_canonname = NULL;
119     hints.ai_addr = NULL;
120     hints.ai_next = NULL;
121
122     int r = getaddrinfo(ip, port, &hints, &result);
123     if (r != 0) {
124         error("Cannot resolve host '%s', port '%s': %s\n", ip, port, gai_strerror(r));
125         return -1;
126     }
127
128     int fd = -1;
129     for (rp = result; rp != NULL && fd == -1; rp = rp->ai_next) {
130         char rip[INET_ADDRSTRLEN + INET6_ADDRSTRLEN] = "INVALID";
131         int rport;
132
133         switch (rp->ai_addr->sa_family) {
134             case AF_INET: {
135                 struct sockaddr_in *sin = (struct sockaddr_in *) rp->ai_addr;
136                 inet_ntop(AF_INET, &sin->sin_addr, rip, INET_ADDRSTRLEN);
137                 rport = ntohs(sin->sin_port);
138                 fd = connect_to_socket4(rip, rport);
139                 break;
140             }
141
142             case AF_INET6: {
143                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *) rp->ai_addr;
144                 inet_ntop(AF_INET6, &sin6->sin6_addr, rip, INET6_ADDRSTRLEN);
145                 rport = ntohs(sin6->sin6_port);
146                 fd = connect_to_socket6(rip, rport);
147                 break;
148             }
149         }
150     }
151
152     freeaddrinfo(result);
153
154     return fd;
155 }
156
157 static inline calculated_number backend_duration_average(RRDSET *st, RRDDIM *rd, time_t after, time_t before) {
158     time_t first_t = rrdset_first_entry_t(st);
159     time_t last_t = rrdset_last_entry_t(st);
160
161     if(unlikely(before - after < st->update_every && after != after - after % st->update_every))
162         // when st->update_every is bigger than the frequency we send data to backend
163         // skip the iterations that are not aligned to the database
164         return NAN;
165
166     // align the time-frame
167     // for 'after' also skip the first value by adding st->update_every
168     after  = after  - after  % st->update_every + st->update_every;
169     before = before - before % st->update_every;
170
171     if(unlikely(after < first_t))
172         after = first_t;
173
174     if(unlikely(after > before))
175         // this can happen when the st->update_every > before - after
176         before = after;
177
178     if(unlikely(before > last_t))
179         before = last_t;
180
181     size_t counter = 0;
182     calculated_number sum = 0;
183
184     long    start_at_slot = rrdset_time2slot(st, before),
185             stop_at_slot  = rrdset_time2slot(st, after),
186             slot, stop_now = 0;
187
188     for(slot = start_at_slot; !stop_now ; slot--) {
189         if(unlikely(slot < 0)) slot = st->entries - 1;
190         if(unlikely(slot == stop_at_slot)) stop_now = 1;
191
192         storage_number n = rd->values[slot];
193         if(unlikely(!does_storage_number_exist(n))) continue;
194
195         calculated_number value = unpack_storage_number(n);
196         sum += value;
197         counter++;
198     }
199
200     if(!counter)
201         return NAN;
202
203     return sum / (calculated_number)counter;
204 }
205
206 static inline void format_dimension_collected_graphite_plaintext(BUFFER *b, const char *prefix, RRDHOST *host, RRDSET *st, RRDDIM *rd, time_t after, time_t before) {
207     (void)before;
208
209     time_t last = rd->last_collected_time.tv_sec;
210     if(likely(last > after))
211         buffer_sprintf(b, "%s.%s.%s.%s " COLLECTED_NUMBER_FORMAT " %u\n", prefix, host->hostname, st->id, rd->id, rd->last_collected_value, (uint32_t)last);
212 }
213
214 static inline void format_dimension_stored_graphite_plaintext(BUFFER *b, const char *prefix, RRDHOST *host, RRDSET *st, RRDDIM *rd, time_t after, time_t before) {
215     calculated_number value = backend_duration_average(st, rd, after, before);
216     if(!isnan(value))
217         buffer_sprintf(b, "%s.%s.%s.%s " CALCULATED_NUMBER_FORMAT " %u\n", prefix, host->hostname, st->id, rd->id, value, (uint32_t)before);
218 }
219
220 static inline void format_dimension_collected_opentsdb_telnet(BUFFER *b, const char *prefix, RRDHOST *host, RRDSET *st, RRDDIM *rd, time_t after, time_t before) {
221     (void)before;
222
223     time_t last = rd->last_collected_time.tv_sec;
224     if(likely(last > after))
225         buffer_sprintf(b, "put %s.%s.%s %u " COLLECTED_NUMBER_FORMAT " host=%s\n", prefix, st->id, rd->id, (uint32_t)last, rd->last_collected_value, host->hostname);
226 }
227
228 static inline void format_dimension_stored_opentsdb_telnet(BUFFER *b, const char *prefix, RRDHOST *host, RRDSET *st, RRDDIM *rd, time_t after, time_t before) {
229     calculated_number value = backend_duration_average(st, rd, after, before);
230     if(!isnan(value))
231         buffer_sprintf(b, "put %s.%s.%s %u " CALCULATED_NUMBER_FORMAT " host=%s\n", prefix, st->id, rd->id, (uint32_t)before, value, host->hostname);
232 }
233
234 void *backends_main(void *ptr) {
235     (void)ptr;
236
237     BUFFER *b = buffer_create(1);
238     void (*formatter)(BUFFER *b, const char *prefix, RRDHOST *host, RRDSET *st, RRDDIM *rd, time_t after, time_t before);
239
240     info("BACKENDs thread created with task id %d", gettid());
241
242     if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
243         error("Cannot set pthread cancel type to DEFERRED.");
244
245     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
246         error("Cannot set pthread cancel state to ENABLE.");
247
248     int default_port = 0;
249     int sock = -1;
250     int enabled = config_get_boolean("backends", "enable", 0);
251     const char *source = config_get("backends", "data source", "as stored");
252     const char *type = config_get("backends", "type", "graphite");
253     const char *destination = config_get("backends", "destination", "localhost");
254     const char *prefix = config_get("backends", "prefix", "netdata");
255     int frequency = (int)config_get_number("backends", "update every", 30);
256     int buffer_on_failures = (int)config_get_number("backends", "buffer on failures", 10);
257
258     if(!enabled)
259         goto cleanup;
260
261     if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
262         default_port = 2003;
263         if(!strcmp(source, "as collected"))
264             formatter = format_dimension_collected_graphite_plaintext;
265         else
266             formatter = format_dimension_stored_graphite_plaintext;
267     }
268     else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
269         default_port = 4242;
270         if(!strcmp(source, "as collected"))
271             formatter = format_dimension_collected_opentsdb_telnet;
272         else
273             formatter = format_dimension_stored_opentsdb_telnet;
274     }
275     else {
276         error("Unknown backend type '%s'", type);
277         goto cleanup;
278     }
279
280     unsigned long long step_ut = frequency * 1000000ULL;
281     unsigned long long random_ut = time_usec() % (step_ut / 2);
282     time_t before = (time_t)((time_usec() - step_ut) / 10000000ULL);
283     time_t after = before;
284     int failures = 0;
285
286     for(;;) {
287         unsigned long long now_ut = time_usec();
288         unsigned long long next_ut = now_ut - (now_ut % step_ut) + step_ut;
289         before = (time_t)(next_ut / 1000000ULL);
290
291         // add a little delay (1/4 of the step) plus some randomness
292         next_ut += (step_ut / 4) + random_ut;
293
294         while(now_ut < next_ut) {
295             sleep_usec(next_ut - now_ut);
296             now_ut = time_usec();
297         }
298
299         RRDSET *st;
300         int pthreadoldcancelstate;
301
302         if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &pthreadoldcancelstate) != 0))
303             error("Cannot set pthread cancel state to DISABLE.");
304
305         rrdhost_rdlock(&localhost);
306         for(st = localhost.rrdset_root; st ;st = st->next) {
307             pthread_rwlock_rdlock(&st->rwlock);
308
309             RRDDIM *rd;
310             for(rd = st->dimensions; rd ;rd = rd->next)
311                 formatter(b, prefix, &localhost, st, rd, after, before);
312
313             pthread_rwlock_unlock(&st->rwlock);
314         }
315         rrdhost_unlock(&localhost);
316
317         if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
318             error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
319
320         if(unlikely(netdata_exit)) break;
321
322         //fprintf(stderr, "\nBACKEND BEGIN:\n%s\nBACKEND END\n", buffer_tostring(b));
323         //fprintf(stderr, "after = %lu, before = %lu\n", after, before);
324
325         if(unlikely(sock == -1)) {
326             const char *s = destination;
327             while(*s) {
328                 const char *e = s;
329
330                 // skip separators, moving both s(tart) and e(nd)
331                 while(isspace(*e) || *e == ',') s = ++e;
332
333                 // move e(nd) to the first separator
334                 while(*e && !isspace(*e) && *e != ',') e++;
335
336                 // is there anything?
337                 if(!*s || s == e) break;
338
339                 char buf[e - s + 1];
340                 strncpyz(buf, s, e - s);
341                 sock = connect_to_one(buf, default_port);
342                 if(sock != -1) break;
343                 s = e;
344             }
345         }
346
347         if(unlikely(netdata_exit)) break;
348
349         if(likely(sock != -1)) {
350             size_t len = buffer_strlen(b);
351             ssize_t written = write(sock, buffer_tostring(b), len);
352             if(written != -1 && (size_t)written == len) {
353                 failures = 0;
354                 buffer_flush(b);
355             }
356             else {
357                 failures++;
358                 error("Failed to write data to database backend '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.", destination, len, written);
359                 close(sock);
360                 sock = -1;
361             }
362
363             after = before;
364         }
365         else {
366             failures++;
367             error("Failed to update database backend '%s'", destination);
368         }
369
370         if(failures > buffer_on_failures) {
371             error("Reached %d backend failures. Flushing buffers to protect this host - this results in data loss on back-end server '%s'", failures, destination);
372             buffer_flush(b);
373             failures = 0;
374         }
375
376         if(unlikely(netdata_exit)) break;
377     }
378
379 cleanup:
380     if(sock != -1)
381         close(sock);
382
383     info("BACKENDs thread exiting");
384
385     pthread_exit(NULL);
386     return NULL;
387 }