]> arthur.barton.de Git - netdata.git/blob - src/rrdpush.c
implemented API authentication
[netdata.git] / src / rrdpush.c
1 #include "common.h"
2
3 #define PIPE_READ 0
4 #define PIPE_WRITE 1
5
6 int rrdpush_pipe[2];
7
8 static BUFFER *rrdpush_buffer = NULL;
9 static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
10 static volatile int rrdpush_connected = 0;
11
12 static inline void rrdpush_lock() {
13     pthread_mutex_lock(&rrdpush_mutex);
14 }
15
16 static inline void rrdpush_unlock() {
17     pthread_mutex_unlock(&rrdpush_mutex);
18 }
19
20 static inline int need_to_send_chart_definition(RRDSET *st) {
21     RRDDIM *rd;
22     rrddim_foreach_read(rd, st)
23         if(!rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
24             return 1;
25
26
27     // fprintf(stderr, "NOT Sending CHART '%s' '%s'\n", st->id, st->name);
28     return 0;
29 }
30
31 static inline void send_chart_definition(RRDSET *st) {
32     // fprintf(stderr, "Sending CHART '%s' '%s'\n", st->id, st->name);
33
34     buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
35                 , st->id
36                 , st->name
37                 , st->title
38                 , st->units
39                 , st->family
40                 , st->context
41                 , rrdset_type_name(st->chart_type)
42                 , st->priority
43                 , st->update_every
44     );
45
46     RRDDIM *rd;
47     rrddim_foreach_read(rd, st) {
48         buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
49                        , rd->id
50                        , rd->name
51                        , rrd_algorithm_name(rd->algorithm)
52                        , rd->multiplier
53                        , rd->divisor
54                        , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
55                        , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
56         );
57         rrddim_flag_set(rd, RRDDIM_FLAG_EXPOSED);
58     }
59 }
60
61 static inline void send_chart_metrics(RRDSET *st) {
62     buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > 60)?st->usec_since_last_update:0);
63
64     RRDDIM *rd;
65     rrddim_foreach_read(rd, st) {
66         if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
67             buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
68                        , rd->id
69                        , rd->collected_value
70         );
71     }
72
73     buffer_strcat(rrdpush_buffer, "END\n");
74 }
75
76 static void reset_all_charts(void) {
77     rrd_rdlock();
78
79     RRDHOST *host;
80     rrdhost_foreach_read(host) {
81         rrdhost_rdlock(host);
82
83         RRDSET *st;
84         rrdset_foreach_read(st, host) {
85
86             // make it re-align the current time
87             // on the remote host
88             st->counter_done = 0;
89
90             rrdset_rdlock(st);
91
92             RRDDIM *rd;
93             rrddim_foreach_read(rd, st)
94                 rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
95
96             rrdset_unlock(st);
97         }
98         rrdhost_unlock(host);
99     }
100     rrd_unlock();
101 }
102
103 void rrdset_done_push(RRDSET *st) {
104     static int error_shown = 0;
105
106     if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
107         return;
108
109
110     rrdpush_lock();
111
112     if(unlikely(!rrdpush_buffer || !rrdpush_connected)) {
113         if(!error_shown)
114             error("STREAM: not ready - discarding collected metrics.");
115
116         error_shown = 1;
117
118         rrdpush_unlock();
119         return;
120     }
121     error_shown = 0;
122
123     rrdset_rdlock(st);
124     if(need_to_send_chart_definition(st))
125         send_chart_definition(st);
126
127     send_chart_metrics(st);
128     rrdset_unlock(st);
129
130     // signal the sender there are more data
131     if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
132         error("STREAM: cannot write to internal pipe");
133
134     rrdpush_unlock();
135 }
136
137 static inline void rrdpush_flush(void) {
138     rrdpush_lock();
139     if(buffer_strlen(rrdpush_buffer))
140         error("STREAM: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
141
142     buffer_flush(rrdpush_buffer);
143     reset_all_charts();
144     rrdpush_unlock();
145 }
146
147 void *central_netdata_push_thread(void *ptr) {
148     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
149
150     info("STREAM: central netdata push thread created with task id %d", gettid());
151
152     if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
153         error("STREAM: cannot set pthread cancel type to DEFERRED.");
154
155     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
156         error("STREAM: cannot set pthread cancel state to ENABLE.");
157
158
159     rrdpush_buffer = buffer_create(1);
160
161     if(pipe(rrdpush_pipe) == -1)
162         fatal("STREAM: cannot create required pipe.");
163
164     struct timeval tv = {
165             .tv_sec = 60,
166             .tv_usec = 0
167     };
168
169     rrdpush_connected = 0;
170     size_t begin = 0;
171     size_t max_size = 1024 * 1024;
172     size_t reconnects_counter = 0;
173     size_t sent_bytes = 0;
174     size_t sent_connection = 0;
175     int sock = -1;
176
177     struct pollfd fds[2], *ifd, *ofd;
178     nfds_t fdmax;
179
180     ifd = &fds[0];
181     ofd = &fds[1];
182
183     for(;;) {
184         if(netdata_exit) break;
185
186         if(unlikely(sock == -1)) {
187             // stop appending data into rrdpush_buffer
188             // they will be lost, so there is no point to do it
189             rrdpush_connected = 0;
190
191             info("STREAM: connecting to central netdata at: %s", central_netdata_to_push_data);
192             sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
193
194             if(unlikely(sock == -1)) {
195                 error("STREAM: failed to connect to central netdata at: %s", central_netdata_to_push_data);
196                 sleep(5);
197                 continue;
198             }
199
200             info("STREAM: initializing communication to central netdata at: %s", central_netdata_to_push_data);
201
202             char http[1000 + 1];
203             snprintfz(http, 1000, "GET /stream?key=%s&hostname=%s&machine_guid=%s&update_every=%d HTTP/1.1\r\n"
204                     "User-Agent: netdata-push-service/%s\r\n"
205                     "Accept: */*\r\n\r\n"
206                       , config_get("global", "central netdata api key", "")
207                       , localhost->hostname
208                       , localhost->machine_guid
209                       , default_rrd_update_every
210                       , program_version
211             );
212
213             if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
214                 close(sock);
215                 sock = -1;
216                 error("STREAM: failed to send http header to netdata at: %s", central_netdata_to_push_data);
217                 sleep(5);
218                 continue;
219             }
220
221             info("STREAM: Waiting for STREAM from central netdata at: %s", central_netdata_to_push_data);
222
223             if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
224                 close(sock);
225                 sock = -1;
226                 error("STREAM: failed to receive STREAM from netdata at: %s", central_netdata_to_push_data);
227                 sleep(5);
228                 continue;
229             }
230
231             if(strncmp(http, "STREAM", 6)) {
232                 close(sock);
233                 sock = -1;
234                 error("STREAM: netdata servers at  %s, did not send STREAM", central_netdata_to_push_data);
235                 sleep(5);
236                 continue;
237             }
238
239             info("STREAM: Established STREAM with central netdata at: %s - sending metrics...", central_netdata_to_push_data);
240
241             if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
242                 error("STREAM: cannot set non-blocking mode for socket.");
243
244             rrdpush_flush();
245             sent_connection = 0;
246
247             // allow appending data into rrdpush_buffer
248             rrdpush_connected = 1;
249         }
250
251         ifd->fd = rrdpush_pipe[PIPE_READ];
252         ifd->events = POLLIN;
253         ifd->revents = 0;
254
255         ofd->fd = sock;
256         ofd->revents = 0;
257         if(begin < buffer_strlen(rrdpush_buffer)) {
258             ofd->events = POLLOUT;
259             fdmax = 2;
260         }
261         else {
262             ofd->events = 0;
263             fdmax = 1;
264         }
265
266         if(netdata_exit) break;
267         int retval = poll(fds, fdmax, 60 * 1000);
268         if(netdata_exit) break;
269
270         if(unlikely(retval == -1)) {
271             if(errno == EAGAIN || errno == EINTR)
272                 continue;
273
274             error("STREAM: Failed to poll().");
275             close(sock);
276             sock = -1;
277             break;
278         }
279         else if(unlikely(!retval)) {
280             // timeout
281             continue;
282         }
283
284         if(ifd->revents & POLLIN) {
285             char buffer[1000 + 1];
286             if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
287                 error("STREAM: Cannot read from internal pipe.");
288         }
289
290         if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
291             // info("STREAM: send buffer is ready, sending %zu bytes starting at %zu", buffer_strlen(rrdpush_buffer) - begin, begin);
292
293             // fprintf(stderr, "PUSH BEGIN\n");
294             // fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr);
295             // fprintf(stderr, "\nPUSH END\n");
296
297             rrdpush_lock();
298             ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
299             if(ret == -1) {
300                 if(errno != EAGAIN && errno != EINTR) {
301                     error("STREAM: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
302                     close(sock);
303                     sock = -1;
304                 }
305             }
306             else {
307                 sent_connection += ret;
308                 sent_bytes += ret;
309                 begin += ret;
310                 if(begin == buffer_strlen(rrdpush_buffer)) {
311                     buffer_flush(rrdpush_buffer);
312                     begin = 0;
313                 }
314             }
315             rrdpush_unlock();
316         }
317
318         // protection from overflow
319         if(rrdpush_buffer->len > max_size) {
320             errno = 0;
321             error("STREAM: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
322             if(sock != -1) {
323                 close(sock);
324                 sock = -1;
325             }
326         }
327     }
328
329     debug(D_WEB_CLIENT, "STREAM: central netdata push thread exits.");
330     if(sock != -1) {
331         close(sock);
332     }
333
334     static_thread->enabled = 0;
335     pthread_exit(NULL);
336     return NULL;
337 }