]> arthur.barton.de Git - netdata.git/blob - src/rrdpush.c
switch from HTTP to stream data
[netdata.git] / src / rrdpush.c
1 #include "common.h"
2
3 #define PIPE_READ 0
4 #define PIPE_WRITE 1
5
6 int rrdpush_pipe[2];
7
8 static BUFFER *rrdpush_buffer = NULL;
9 static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
10 static RRDHOST *last_host = NULL;
11
12 static inline void rrdpush_lock() {
13     pthread_mutex_lock(&rrdpush_mutex);
14 }
15
16 static inline void rrdpush_unlock() {
17     pthread_mutex_unlock(&rrdpush_mutex);
18 }
19
20 static inline int need_to_send_chart_definitions(RRDSET *st) {
21     RRDDIM *rd;
22     rrddim_foreach_read(rd, st)
23         if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && !rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
24             return 1;
25
26     return 0;
27 }
28
29 static inline void send_chart_definitions(RRDSET *st) {
30     buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
31                 , st->id
32                 , st->name
33                 , st->title
34                 , st->units
35                 , st->family
36                 , st->context
37                 , rrdset_type_name(st->chart_type)
38                 , st->priority
39                 , st->update_every
40     );
41
42     RRDDIM *rd;
43     rrddim_foreach_read(rd, st) {
44         buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
45                        , rd->id
46                        , rd->name
47                        , rrd_algorithm_name(rd->algorithm)
48                        , rd->multiplier
49                        , rd->divisor
50                        , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
51                        , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
52         );
53         rrddim_flag_set(rd, RRDDIM_FLAG_EXPOSED);
54     }
55 }
56
57 static inline void send_chart_metrics(RRDSET *st) {
58     buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, st->usec_since_last_update);
59
60     RRDDIM *rd;
61     rrddim_foreach_read(rd, st) {
62         if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))
63             buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
64                        , rd->id
65                        , rd->collected_value
66         );
67     }
68
69     buffer_strcat(rrdpush_buffer, "END\n");
70 }
71
72 static void reset_all_charts(void) {
73     rrd_rdlock();
74
75     RRDHOST *host;
76     rrdhost_foreach_read(host) {
77         rrdhost_rdlock(host);
78
79         RRDSET *st;
80         rrdset_foreach_read(st, host) {
81             rrdset_rdlock(st);
82
83             RRDDIM *rd;
84             rrddim_foreach_read(rd, st)
85                 rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
86
87             rrdset_unlock(st);
88         }
89         rrdhost_unlock(host);
90     }
91     rrd_unlock();
92
93     last_host = NULL;
94 }
95
96 void rrdset_done_push(RRDSET *st) {
97
98     if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED) || !rrdpush_buffer))
99         return;
100
101     rrdpush_lock();
102     rrdset_rdlock(st);
103
104     if(st->rrdhost != last_host) {
105         buffer_sprintf(rrdpush_buffer, "HOST '%s' '%s'\n", st->rrdhost->machine_guid, st->rrdhost->hostname);
106         last_host = st->rrdhost;
107     }
108
109     if(need_to_send_chart_definitions(st))
110         send_chart_definitions(st);
111
112     send_chart_metrics(st);
113
114     // signal the sender there are more data
115     if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
116         error("Cannot write to internal pipe");
117
118     rrdset_unlock(st);
119     rrdpush_unlock();
120 }
121
122 void *central_netdata_push_thread(void *ptr) {
123     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
124
125     info("Central netdata push thread created with task id %d", gettid());
126
127     if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
128         error("Cannot set pthread cancel type to DEFERRED.");
129
130     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
131         error("Cannot set pthread cancel state to ENABLE.");
132
133
134     rrdpush_buffer = buffer_create(1);
135
136     if(pipe(rrdpush_pipe) == -1)
137         fatal("Cannot create required pipe.");
138
139     struct timeval tv = {
140             .tv_sec = 60,
141             .tv_usec = 0
142     };
143
144     size_t begin = 0;
145     size_t max_size = 1024 * 1024;
146     size_t reconnects_counter = 0;
147     size_t sent_bytes = 0;
148     size_t sent_connection = 0;
149     int sock = -1;
150     char buffer[1000 + 1];
151
152     struct pollfd fds[2], *ifd, *ofd;
153
154     ifd = &fds[0];
155     ofd = &fds[1];
156
157     ifd->fd = rrdpush_pipe[PIPE_READ];
158     ifd->events = POLLIN;
159     ofd->events = POLLOUT;
160
161     nfds_t fdmax = 2;
162
163     for(;;) {
164         if(netdata_exit) break;
165
166         if(unlikely(sock == -1)) {
167             info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data);
168             sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
169
170             if(unlikely(sock == -1)) {
171                 error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
172                 sleep(5);
173                 continue;
174             }
175
176             info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
177
178             char http[1000 + 1];
179             snprintfz(http, 1000, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), program_version);
180             if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
181                 close(sock);
182                 sock = -1;
183                 error("PUSH: failed to send http header to netdata at: %s", central_netdata_to_push_data);
184                 sleep(5);
185                 continue;
186             }
187
188             if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
189                 close(sock);
190                 sock = -1;
191                 error("PUSH: failed to receive OK from netdata at: %s", central_netdata_to_push_data);
192                 sleep(5);
193                 continue;
194             }
195
196             if(strncmp(http, "STREAM", 6)) {
197                 close(sock);
198                 sock = -1;
199                 error("PUSH: netdata servers at  %s, did not send STREAM", central_netdata_to_push_data);
200                 sleep(5);
201                 continue;
202             }
203
204             if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
205                 error("PUSH: cannot set non-blocking mode for socket.");
206
207             rrdpush_lock();
208             if(buffer_strlen(rrdpush_buffer))
209                 error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
210
211             buffer_flush(rrdpush_buffer);
212             reset_all_charts();
213             last_host = NULL;
214             rrdpush_unlock();
215             sent_connection = 0;
216         }
217
218         ifd->revents = 0;
219         ofd->revents = 0;
220         ofd->fd = sock;
221
222         if(begin < buffer_strlen(rrdpush_buffer))
223             ofd->events = POLLOUT;
224         else
225             ofd->events = 0;
226
227         if(netdata_exit) break;
228         int retval = poll(fds, fdmax, 60 * 1000);
229         if(netdata_exit) break;
230
231         if(unlikely(retval == -1)) {
232             if(errno == EAGAIN || errno == EINTR)
233                 continue;
234
235             error("PUSH: Failed to poll().");
236             close(sock);
237             sock = -1;
238             break;
239         }
240         else if(unlikely(!retval)) {
241             // timeout
242             continue;
243         }
244
245         if(ifd->revents & POLLIN) {
246             if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
247                 error("PUSH: Cannot read from internal pipe.");
248         }
249
250         if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
251             // fprintf(stderr, "PUSH BEGIN\n");
252             // fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr);
253             // fprintf(stderr, "\nPUSH END\n");
254
255             rrdpush_lock();
256             ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
257             if(ret == -1) {
258                 if(errno != EAGAIN && errno != EINTR) {
259                     error("PUSH: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
260                     close(sock);
261                     sock = -1;
262                 }
263             }
264             else {
265                 sent_connection += ret;
266                 sent_bytes += ret;
267                 begin += ret;
268                 if(begin == buffer_strlen(rrdpush_buffer)) {
269                     buffer_flush(rrdpush_buffer);
270                     begin = 0;
271                 }
272             }
273             rrdpush_unlock();
274         }
275
276         // protection from overflow
277         if(rrdpush_buffer->len > max_size) {
278             errno = 0;
279             error("PUSH: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
280             if(sock != -1) {
281                 close(sock);
282                 sock = -1;
283             }
284         }
285     }
286
287     debug(D_WEB_CLIENT, "Central netdata push thread exits.");
288     if(sock != -1)
289         close(sock);
290
291     static_thread->enabled = 0;
292     pthread_exit(NULL);
293     return NULL;
294 }