]> arthur.barton.de Git - netdata.git/blob - src/rrdpush.c
do not try to reconnect too soon
[netdata.git] / src / rrdpush.c
1 #include "common.h"
2
3 #define PIPE_READ 0
4 #define PIPE_WRITE 1
5
6 int rrdpush_pipe[2];
7
8 static BUFFER *rrdpush_buffer = NULL;
9 static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
10 static volatile RRDHOST *last_host = NULL;
11 static volatile int rrdpush_connected = 0;
12
13 static inline void rrdpush_lock() {
14     pthread_mutex_lock(&rrdpush_mutex);
15 }
16
17 static inline void rrdpush_unlock() {
18     pthread_mutex_unlock(&rrdpush_mutex);
19 }
20
21 static inline int need_to_send_chart_definition(RRDSET *st) {
22     RRDDIM *rd;
23     rrddim_foreach_read(rd, st)
24         if(!rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
25             return 1;
26
27
28     // fprintf(stderr, "NOT Sending CHART '%s' '%s'\n", st->id, st->name);
29     return 0;
30 }
31
32 static inline void send_chart_definition(RRDSET *st) {
33     // fprintf(stderr, "Sending CHART '%s' '%s'\n", st->id, st->name);
34
35     buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
36                 , st->id
37                 , st->name
38                 , st->title
39                 , st->units
40                 , st->family
41                 , st->context
42                 , rrdset_type_name(st->chart_type)
43                 , st->priority
44                 , st->update_every
45     );
46
47     RRDDIM *rd;
48     rrddim_foreach_read(rd, st) {
49         buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
50                        , rd->id
51                        , rd->name
52                        , rrd_algorithm_name(rd->algorithm)
53                        , rd->multiplier
54                        , rd->divisor
55                        , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
56                        , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
57         );
58         rrddim_flag_set(rd, RRDDIM_FLAG_EXPOSED);
59     }
60 }
61
62 static inline void send_chart_metrics(RRDSET *st) {
63     buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, st->usec_since_last_update);
64
65     RRDDIM *rd;
66     rrddim_foreach_read(rd, st) {
67         if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
68             buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
69                        , rd->id
70                        , rd->collected_value
71         );
72     }
73
74     buffer_strcat(rrdpush_buffer, "END\n");
75 }
76
77 static void reset_all_charts(void) {
78     rrd_rdlock();
79
80     RRDHOST *host;
81     rrdhost_foreach_read(host) {
82         rrdhost_rdlock(host);
83
84         RRDSET *st;
85         rrdset_foreach_read(st, host) {
86             rrdset_rdlock(st);
87
88             RRDDIM *rd;
89             rrddim_foreach_read(rd, st)
90                 rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
91
92             rrdset_unlock(st);
93         }
94         rrdhost_unlock(host);
95     }
96     rrd_unlock();
97
98     last_host = NULL;
99 }
100
101 void rrdset_done_push(RRDSET *st) {
102     static int error_shown = 0;
103
104     if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
105         return;
106
107
108     rrdpush_lock();
109
110     if(unlikely(!rrdpush_buffer || !rrdpush_connected)) {
111         if(!error_shown)
112             error("STREAM: not ready - discarding collected metrics.");
113
114         error_shown = 1;
115
116         rrdpush_unlock();
117         return;
118     }
119     error_shown = 0;
120
121     if(st->rrdhost != last_host) {
122         buffer_sprintf(rrdpush_buffer, "HOST '%s' '%s'\n", st->rrdhost->machine_guid, st->rrdhost->hostname);
123         last_host = st->rrdhost;
124     }
125
126     rrdset_rdlock(st);
127     if(need_to_send_chart_definition(st))
128         send_chart_definition(st);
129
130     send_chart_metrics(st);
131     rrdset_unlock(st);
132
133     // signal the sender there are more data
134     if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
135         error("STREAM: cannot write to internal pipe");
136
137     rrdpush_unlock();
138 }
139
140 static inline void rrdpush_flush(void) {
141     rrdpush_lock();
142     if(buffer_strlen(rrdpush_buffer))
143         error("STREAM: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
144
145     buffer_flush(rrdpush_buffer);
146     reset_all_charts();
147     last_host = NULL;
148     rrdpush_unlock();
149 }
150
151 void *central_netdata_push_thread(void *ptr) {
152     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
153
154     info("STREAM: central netdata push thread created with task id %d", gettid());
155
156     if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
157         error("STREAM: cannot set pthread cancel type to DEFERRED.");
158
159     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
160         error("STREAM: cannot set pthread cancel state to ENABLE.");
161
162
163     rrdpush_buffer = buffer_create(1);
164
165     if(pipe(rrdpush_pipe) == -1)
166         fatal("STREAM: cannot create required pipe.");
167
168     struct timeval tv = {
169             .tv_sec = 60,
170             .tv_usec = 0
171     };
172
173     rrdpush_connected = 0;
174     size_t begin = 0;
175     size_t max_size = 1024 * 1024;
176     size_t reconnects_counter = 0;
177     size_t sent_bytes = 0;
178     size_t sent_connection = 0;
179     int sock = -1;
180
181     struct pollfd fds[2], *ifd, *ofd;
182     nfds_t fdmax;
183
184     ifd = &fds[0];
185     ofd = &fds[1];
186
187     for(;;) {
188         if(netdata_exit) break;
189
190         if(unlikely(sock == -1)) {
191             // stop appending data into rrdpush_buffer
192             // they will be lost, so there is no point to do it
193             rrdpush_connected = 0;
194
195             info("STREAM: connecting to central netdata at: %s", central_netdata_to_push_data);
196             sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
197
198             if(unlikely(sock == -1)) {
199                 error("STREAM: failed to connect to central netdata at: %s", central_netdata_to_push_data);
200                 sleep(5);
201                 continue;
202             }
203
204             info("STREAM: initializing communication to central netdata at: %s", central_netdata_to_push_data);
205
206             char http[1000 + 1];
207             snprintfz(http, 1000, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), program_version);
208             if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
209                 close(sock);
210                 sock = -1;
211                 error("STREAM: failed to send http header to netdata at: %s", central_netdata_to_push_data);
212                 sleep(5);
213                 continue;
214             }
215
216             info("STREAM: Waiting for STREAM from central netdata at: %s", central_netdata_to_push_data);
217
218             if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
219                 close(sock);
220                 sock = -1;
221                 error("STREAM: failed to receive STREAM from netdata at: %s", central_netdata_to_push_data);
222                 sleep(5);
223                 continue;
224             }
225
226             if(strncmp(http, "STREAM", 6)) {
227                 close(sock);
228                 sock = -1;
229                 error("STREAM: netdata servers at  %s, did not send STREAM", central_netdata_to_push_data);
230                 sleep(5);
231                 continue;
232             }
233
234             info("STREAM: Established STREAM with central netdata at: %s - sending metrics...", central_netdata_to_push_data);
235
236             if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
237                 error("STREAM: cannot set non-blocking mode for socket.");
238
239             rrdpush_flush();
240             sent_connection = 0;
241
242             // allow appending data into rrdpush_buffer
243             rrdpush_connected = 1;
244         }
245
246         ifd->fd = rrdpush_pipe[PIPE_READ];
247         ifd->events = POLLIN;
248         ifd->revents = 0;
249
250         ofd->fd = sock;
251         ofd->revents = 0;
252         if(begin < buffer_strlen(rrdpush_buffer)) {
253             ofd->events = POLLOUT;
254             fdmax = 2;
255         }
256         else {
257             ofd->events = 0;
258             fdmax = 1;
259         }
260
261         if(netdata_exit) break;
262         int retval = poll(fds, fdmax, 60 * 1000);
263         if(netdata_exit) break;
264
265         if(unlikely(retval == -1)) {
266             if(errno == EAGAIN || errno == EINTR)
267                 continue;
268
269             error("STREAM: Failed to poll().");
270             close(sock);
271             sock = -1;
272             break;
273         }
274         else if(unlikely(!retval)) {
275             // timeout
276             continue;
277         }
278
279         if(ifd->revents & POLLIN) {
280             char buffer[1000 + 1];
281             if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
282                 error("STREAM: Cannot read from internal pipe.");
283         }
284
285         if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
286             // info("STREAM: send buffer is ready, sending %zu bytes starting at %zu", buffer_strlen(rrdpush_buffer) - begin, begin);
287
288             // fprintf(stderr, "PUSH BEGIN\n");
289             // fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr);
290             // fprintf(stderr, "\nPUSH END\n");
291
292             rrdpush_lock();
293             ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
294             if(ret == -1) {
295                 if(errno != EAGAIN && errno != EINTR) {
296                     error("STREAM: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
297                     close(sock);
298                     sock = -1;
299                 }
300             }
301             else {
302                 sent_connection += ret;
303                 sent_bytes += ret;
304                 begin += ret;
305                 if(begin == buffer_strlen(rrdpush_buffer)) {
306                     buffer_flush(rrdpush_buffer);
307                     begin = 0;
308                 }
309             }
310             rrdpush_unlock();
311         }
312
313         // protection from overflow
314         if(rrdpush_buffer->len > max_size) {
315             errno = 0;
316             error("STREAM: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
317             if(sock != -1) {
318                 close(sock);
319                 sock = -1;
320             }
321         }
322     }
323
324     debug(D_WEB_CLIENT, "STREAM: central netdata push thread exits.");
325     if(sock != -1) {
326         close(sock);
327     }
328
329     static_thread->enabled = 0;
330     pthread_exit(NULL);
331     return NULL;
332 }