]> arthur.barton.de Git - netdata.git/blob - src/backends.c
initial backends code
[netdata.git] / src / backends.c
1 #include "common.h"
2
3 int connect_to_socket4(const char *ip, int port) {
4     int sock;
5
6     debug(D_LISTENER, "IPv4 connecting to ip '%s' port %d", ip, port);
7
8     sock = socket(AF_INET, SOCK_STREAM, 0);
9     if(sock < 0) {
10         error("IPv4 socket() on ip '%s' port %d failed.", ip, port);
11         return -1;
12     }
13
14     struct sockaddr_in name;
15     memset(&name, 0, sizeof(struct sockaddr_in));
16     name.sin_family = AF_INET;
17     name.sin_port = htons(port);
18
19     int ret = inet_pton(AF_INET, ip, (void *)&name.sin_addr.s_addr);
20     if(ret != 1) {
21         error("Failed to convert '%s' to a valid IPv4 address.", ip);
22         close(sock);
23         return -1;
24     }
25
26     if(connect(sock, (struct sockaddr *) &name, sizeof(name)) < 0) {
27         close(sock);
28         error("IPv4 failed to connect to '%s', port %d", ip, port);
29         return -1;
30     }
31
32     debug(D_LISTENER, "Connected to IPv4 ip '%s' port %d", ip, port);
33     return sock;
34 }
35
36 int connect_to_socket6(const char *ip, int port) {
37     int sock = -1;
38     int ipv6only = 1;
39
40     debug(D_LISTENER, "IPv6 connecting to ip '%s' port %d", ip, port);
41
42     sock = socket(AF_INET6, SOCK_STREAM, 0);
43     if (sock < 0) {
44         error("IPv6 socket() on ip '%s' port %d failed.", ip, port);
45         return -1;
46     }
47
48     /* IPv6 only */
49     if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (void*)&ipv6only, sizeof(ipv6only)) != 0)
50         error("Cannot set IPV6_V6ONLY on ip '%s' port's %d.", ip, port);
51
52     struct sockaddr_in6 name;
53     memset(&name, 0, sizeof(struct sockaddr_in6));
54     name.sin6_family = AF_INET6;
55     name.sin6_port = htons ((uint16_t) port);
56
57     int ret = inet_pton(AF_INET6, ip, (void *)&name.sin6_addr.s6_addr);
58     if(ret != 1) {
59         error("Failed to convert IP '%s' to a valid IPv6 address.", ip);
60         close(sock);
61         return -1;
62     }
63
64     name.sin6_scope_id = 0;
65
66     if(connect(sock, (struct sockaddr *)&name, sizeof(name)) < 0) {
67         close(sock);
68         error("IPv6 failed to connect to '%s', port %d", ip, port);
69         return -1;
70     }
71
72     debug(D_LISTENER, "Connected to IPv6 ip '%s' port %d", ip, port);
73     return sock;
74 }
75
76
77 static inline int connect_to_one(const char *definition, int default_port) {
78     struct addrinfo hints;
79     struct addrinfo *result = NULL, *rp = NULL;
80
81     char buffer[strlen(definition) + 1];
82     strcpy(buffer, definition);
83
84     char buffer2[10 + 1];
85     snprintfz(buffer2, 10, "%d", default_port);
86
87     char *ip = buffer, *port = buffer2;
88
89     char *e = ip;
90     if(*e == '[') {
91         e = ++ip;
92         while(*e && *e != ']') e++;
93         if(*e == ']') {
94             *e = '\0';
95             e++;
96         }
97     }
98     else {
99         while(*e && *e != ':') e++;
100     }
101
102     if(*e == ':') {
103         port = e + 1;
104         *e = '\0';
105     }
106
107     if(!*ip)
108         return -1;
109
110     if(!*port)
111         port = buffer2;
112
113     memset(&hints, 0, sizeof(struct addrinfo));
114     hints.ai_family = AF_UNSPEC;    /* Allow IPv4 or IPv6 */
115     hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
116     hints.ai_flags = AI_PASSIVE;    /* For wildcard IP address */
117     hints.ai_protocol = 0;          /* Any protocol */
118     hints.ai_canonname = NULL;
119     hints.ai_addr = NULL;
120     hints.ai_next = NULL;
121
122     int r = getaddrinfo(ip, port, &hints, &result);
123     if (r != 0) {
124         error("Cannot resolve host '%s', port '%s': %s\n", ip, port, gai_strerror(r));
125         return -1;
126     }
127
128     int fd = -1;
129     for (rp = result; rp != NULL && fd == -1; rp = rp->ai_next) {
130         char rip[INET_ADDRSTRLEN + INET6_ADDRSTRLEN] = "INVALID";
131         int rport;
132
133         switch (rp->ai_addr->sa_family) {
134             case AF_INET: {
135                 struct sockaddr_in *sin = (struct sockaddr_in *) rp->ai_addr;
136                 inet_ntop(AF_INET, &sin->sin_addr, rip, INET_ADDRSTRLEN);
137                 rport = ntohs(sin->sin_port);
138                 fd = connect_to_socket4(rip, rport);
139                 break;
140             }
141
142             case AF_INET6: {
143                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *) rp->ai_addr;
144                 inet_ntop(AF_INET6, &sin6->sin6_addr, rip, INET6_ADDRSTRLEN);
145                 rport = ntohs(sin6->sin6_port);
146                 fd = connect_to_socket6(rip, rport);
147                 break;
148             }
149         }
150     }
151
152     freeaddrinfo(result);
153
154     return fd;
155 }
156
157 static inline void format_dimension_collected_graphite_plaintext(BUFFER *b, const char *prefix, RRDHOST *host, RRDSET *st, RRDDIM *rd, time_t after, time_t before) {
158     time_t last = rd->last_collected_time.tv_sec;
159     if(likely(last >= after && last < before))
160         buffer_sprintf(b, "%s.%s.%s.%s " COLLECTED_NUMBER_FORMAT " %u\n", prefix, host->hostname, st->id, rd->id, rd->last_collected_value, (uint32_t)last);
161 }
162
163 static inline void format_dimension_stored_graphite_plaintext(BUFFER *b, const char *prefix, RRDHOST *host, RRDSET *st, RRDDIM *rd, time_t after, time_t before) {
164     time_t last = rd->last_collected_time.tv_sec;
165     if(likely(last >= after && last < before))
166         buffer_sprintf(b, "%s.%s.%s.%s " CALCULATED_NUMBER_FORMAT " %u\n", prefix, host->hostname, st->id, rd->id, rd->last_stored_value, (uint32_t)last);
167 }
168
169 static inline void format_dimension_collected_opentsdb_telnet(BUFFER *b, const char *prefix, RRDHOST *host, RRDSET *st, RRDDIM *rd, time_t after, time_t before) {
170     time_t last = rd->last_collected_time.tv_sec;
171     if(likely(last >= after && last < before))
172         buffer_sprintf(b, "put %s.%s.%s %u " COLLECTED_NUMBER_FORMAT " host=%s\n", prefix, st->id, rd->id, (uint32_t)last, rd->last_collected_value, host->hostname);
173 }
174
175 static inline void format_dimension_stored_opentsdb_telnet(BUFFER *b, const char *prefix, RRDHOST *host, RRDSET *st, RRDDIM *rd, time_t after, time_t before) {
176     time_t last = rd->last_collected_time.tv_sec;
177     if(likely(last >= after && last < before))
178         buffer_sprintf(b, "put %s.%s.%s %u " CALCULATED_NUMBER_FORMAT " host=%s\n", prefix, st->id, rd->id, (uint32_t)last, rd->last_stored_value, host->hostname);
179 }
180
181 void *backends_main(void *ptr) {
182     (void)ptr;
183     BUFFER *b = buffer_create(1);
184     void (*formatter)(BUFFER *b, const char *prefix, RRDHOST *host, RRDSET *st, RRDDIM *rd, time_t after, time_t before);
185
186     info("BACKENDs thread created with task id %d", gettid());
187
188     if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
189         error("Cannot set pthread cancel type to DEFERRED.");
190
191     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
192         error("Cannot set pthread cancel state to ENABLE.");
193
194     int default_port = 0;
195     int enabled = config_get_boolean("backends", "enable", 0);
196     const char *source = config_get("backends", "data source", "as stored");
197     const char *type = config_get("backends", "type", "graphite");
198     const char *destination = config_get("backends", "destination", "localhost");
199     const char *prefix = config_get("backends", "prefix", "netdata");
200     int frequency = (int)config_get_number("backends", "update every", 10);
201
202     if(!enabled)
203         goto cleanup;
204
205     if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
206         default_port = 2003;
207         if(!strcmp(source, "as collected"))
208             formatter = format_dimension_collected_graphite_plaintext;
209         else
210             formatter = format_dimension_stored_graphite_plaintext;
211     }
212     else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
213         default_port = 2003;
214         if(!strcmp(source, "as collected"))
215             formatter = format_dimension_collected_opentsdb_telnet;
216         else
217             formatter = format_dimension_stored_opentsdb_telnet;
218     }
219     else {
220         error("Unknown backend type '%s'", type);
221         goto cleanup;
222     }
223
224     time_t after, before = (time_t)(time_usec() / 10000000ULL);
225
226     unsigned long long step = frequency * 1000000ULL;
227     for(;;) {
228         unsigned long long now = time_usec();
229         unsigned long long next = now - (now % step) + step + (step / 2);
230
231         while(now < next) {
232             sleep_usec(next - now);
233             now = time_usec();
234         }
235
236         after = before;
237         before = (time_t)(now / 10000000ULL);
238         RRDSET *st;
239         int pthreadoldcancelstate;
240         buffer_flush(b);
241
242         if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &pthreadoldcancelstate) != 0))
243             error("Cannot set pthread cancel state to DISABLE.");
244
245         rrdhost_rdlock(&localhost);
246         for(st = localhost.rrdset_root; st ;st = st->next) {
247             pthread_rwlock_rdlock(&st->rwlock);
248             RRDDIM *rd;
249             for(rd = st->dimensions; rd ;rd = rd->next) {
250                 formatter(b, prefix, &localhost, st, rd, after, before);
251             }
252             pthread_rwlock_unlock(&st->rwlock);
253         }
254         rrdhost_unlock(&localhost);
255
256         if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
257             error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
258
259         if(unlikely(netdata_exit)) break;
260
261         break;
262     }
263
264 cleanup:
265     info("BACKENDs thread exiting");
266
267     pthread_exit(NULL);
268     return NULL;
269 }