3 int rrdpush_enabled = 0;
4 int rrdpush_exclusive = 1;
6 static char *remote_netdata_config = NULL;
7 static char *api_key = NULL;
9 #define CONNECTED_TO_SIZE 100
11 // data collection happens from multiple threads
12 // each of these threads calls rrdset_done()
13 // which in turn calls rrdset_done_push()
14 // which uses this pipe to notify the streaming thread
15 // that there are more data ready to be sent
18 int rrdpush_pipe[2] = { -1, -1 };
20 // a buffer used to store data to be sent.
21 // the format is the same as external plugins.
22 static BUFFER *rrdpush_buffer = NULL;
24 // locking to get exclusive access to shared resources
25 // (rrdpush_pipe[PIPE_WRITE], rrdpush_buffer
26 static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
28 // if the streaming thread is connected to a remote netdata
29 // this is set to 1, otherwise 0.
30 static volatile int rrdpush_connected = 0;
32 // to have the remote netdata re-sync the charts
33 // to its current clock, we send for this many
34 // iterations a BEGIN line without microseconds
35 // this is for the first iterations of each chart
36 static unsigned int remote_clock_resync_iterations = 60;
38 #define rrdpush_lock() pthread_mutex_lock(&rrdpush_mutex)
39 #define rrdpush_unlock() pthread_mutex_unlock(&rrdpush_mutex)
41 // checks if the current chart definition has been sent
42 static inline int need_to_send_chart_definition(RRDSET *st) {
44 rrddim_foreach_read(rd, st)
45 if(!rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
51 // sends the current chart definition
52 static inline void send_chart_definition(RRDSET *st) {
53 buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
60 , rrdset_type_name(st->chart_type)
66 rrddim_foreach_read(rd, st) {
67 buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
70 , rrd_algorithm_name(rd->algorithm)
73 , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
74 , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
76 rrddim_flag_set(rd, RRDDIM_FLAG_EXPOSED);
80 // sends the current chart dimensions
81 static inline void send_chart_metrics(RRDSET *st) {
82 buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > remote_clock_resync_iterations)?st->usec_since_last_update:0);
85 rrddim_foreach_read(rd, st) {
86 if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
87 buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
93 buffer_strcat(rrdpush_buffer, "END\n");
96 // resets all the chart, so that their definitions
97 // will be resent to the central netdata
98 static void reset_all_charts(void) {
102 rrdhost_foreach_read(host) {
103 rrdhost_rdlock(host);
106 rrdset_foreach_read(st, host) {
108 // make it re-align the current time
109 // on the remote host
110 st->counter_done = 0;
115 rrddim_foreach_read(rd, st)
116 rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
120 rrdhost_unlock(host);
125 void rrdset_done_push(RRDSET *st) {
126 static int error_shown = 0;
128 if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
133 if(unlikely(!rrdpush_buffer || !rrdpush_connected)) {
134 if(unlikely(!error_shown))
135 error("STREAM [send]: not ready - discarding collected metrics.");
142 else if(unlikely(error_shown)) {
143 error("STREAM [send]: ready - sending metrics...");
148 if(need_to_send_chart_definition(st))
149 send_chart_definition(st);
151 send_chart_metrics(st);
154 // signal the sender there are more data
155 if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
156 error("STREAM [send]: cannot write to internal pipe");
161 static inline void rrdpush_flush(void) {
163 if(buffer_strlen(rrdpush_buffer))
164 error("STREAM [send]: discarding %zu bytes of metrics already in the buffer.", buffer_strlen(rrdpush_buffer));
166 buffer_flush(rrdpush_buffer);
172 rrdpush_enabled = config_get_boolean("stream", "enabled", rrdpush_enabled);
173 rrdpush_exclusive = config_get_boolean("stream", "exclusive", rrdpush_exclusive);
174 remote_netdata_config = config_get("stream", "stream metrics to", "");
175 api_key = config_get("stream", "api key", "");
177 if(!rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) {
179 rrdpush_exclusive = 0;
182 return rrdpush_enabled;
185 void *rrdpush_sender_thread(void *ptr) {
186 struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
188 info("STREAM [send]: thread created (task id %d)", gettid());
190 if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
191 error("STREAM [send]: cannot set pthread cancel type to DEFERRED.");
193 if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
194 error("STREAM [send]: cannot set pthread cancel state to ENABLE.");
196 int timeout = (int)config_get_number("stream", "timeout seconds", 60);
197 int default_port = (int)config_get_number("stream", "default port", 19999);
198 size_t max_size = (size_t)config_get_number("stream", "buffer size bytes", 1024 * 1024);
199 unsigned int reconnect_delay = (unsigned int)config_get_number("stream", "reconnect delay seconds", 5);
200 remote_clock_resync_iterations = (unsigned int)config_get_number("stream", "initial clock resync iterations", remote_clock_resync_iterations);
202 char connected_to[CONNECTED_TO_SIZE + 1] = "";
204 if(!rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key)
207 // initialize rrdpush globals
208 rrdpush_buffer = buffer_create(1);
209 rrdpush_connected = 0;
210 if(pipe(rrdpush_pipe) == -1) fatal("STREAM [send]: cannot create required pipe.");
212 // initialize local variables
214 size_t reconnects_counter = 0;
215 size_t sent_bytes = 0;
216 size_t sent_connection = 0;
218 struct timeval tv = {
223 struct pollfd fds[2], *ifd, *ofd;
230 if(netdata_exit) break;
232 if(unlikely(sock == -1)) {
233 // stop appending data into rrdpush_buffer
234 // they will be lost, so there is no point to do it
235 rrdpush_connected = 0;
237 info("STREAM [send to %s]: connecting...", remote_netdata_config);
238 sock = connect_to_one_of(remote_netdata_config, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE);
240 if(unlikely(sock == -1)) {
241 error("STREAM [send to %s]: failed to connect", remote_netdata_config);
242 sleep(reconnect_delay);
246 info("STREAM [send to %s]: initializing communication...", connected_to);
249 snprintfz(http, 1000,
250 "STREAM key=%s&hostname=%s&machine_guid=%s&os=%s&update_every=%d HTTP/1.1\r\n"
251 "User-Agent: netdata-push-service/%s\r\n"
252 "Accept: */*\r\n\r\n"
254 , localhost->hostname
255 , localhost->machine_guid
257 , default_rrd_update_every
261 if(send_timeout(sock, http, strlen(http), 0, timeout) == -1) {
264 error("STREAM [send to %s]: failed to send http header to netdata", connected_to);
265 sleep(reconnect_delay);
269 info("STREAM [send to %s]: waiting response from remote netdata...", connected_to);
271 if(recv_timeout(sock, http, 1000, 0, timeout) == -1) {
274 error("STREAM [send to %s]: failed to initialize communication", connected_to);
275 sleep(reconnect_delay);
279 if(strncmp(http, "STREAM", 6)) {
282 error("STREAM [send to %s]: server is not replying properly.", connected_to);
283 sleep(reconnect_delay);
287 info("STREAM [send to %s]: established communication - sending metrics...", connected_to);
289 if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
290 error("STREAM [send to %s]: cannot set non-blocking mode for socket.", connected_to);
295 // allow appending data into rrdpush_buffer
296 rrdpush_connected = 1;
299 ifd->fd = rrdpush_pipe[PIPE_READ];
300 ifd->events = POLLIN;
305 if(begin < buffer_strlen(rrdpush_buffer)) {
306 ofd->events = POLLOUT;
314 if(netdata_exit) break;
315 int retval = poll(fds, fdmax, timeout * 1000);
316 if(netdata_exit) break;
318 if(unlikely(retval == -1)) {
319 if(errno == EAGAIN || errno == EINTR)
322 error("STREAM [send to %s]: failed to poll().", connected_to);
327 else if(unlikely(!retval)) {
332 if(ifd->revents & POLLIN) {
333 char buffer[1000 + 1];
334 if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
335 error("STREAM [send to %s]: cannot read from internal pipe.", connected_to);
338 if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
340 ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
342 if(errno != EAGAIN && errno != EINTR) {
343 error("STREAM [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", connected_to, sent_connection);
349 sent_connection += ret;
352 if(begin == buffer_strlen(rrdpush_buffer)) {
353 buffer_flush(rrdpush_buffer);
360 // protection from overflow
361 if(rrdpush_buffer->len > max_size) {
363 error("STREAM [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", connected_to, rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
372 debug(D_WEB_CLIENT, "STREAM [send]: sending thread exits.");
374 // make sure the data collection threads do not write data
375 rrdpush_connected = 0;
378 if(rrdpush_pipe[PIPE_READ] != -1) close(rrdpush_pipe[PIPE_READ]);
379 if(rrdpush_pipe[PIPE_WRITE] != -1) close(rrdpush_pipe[PIPE_WRITE]);
382 if(sock != -1) close(sock);
385 buffer_free(rrdpush_buffer);
386 rrdpush_buffer = NULL;
389 static_thread->enabled = 0;
395 // ----------------------------------------------------------------------------
398 int rrdpush_receive(int fd, const char *key, const char *hostname, const char *machine_guid, const char *os, int update_every, char *client_ip, char *client_port) {
400 int history = default_rrd_history_entries;
401 RRD_MEMORY_MODE mode = default_rrd_memory_mode;
402 int health_enabled = default_health_enabled;
403 time_t alarms_delay = 60;
405 update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
406 if(update_every < 0) update_every = 1;
408 history = (int)appconfig_get_number(&stream_config, key, "default history", history);
409 history = (int)appconfig_get_number(&stream_config, machine_guid, "history", history);
410 if(history < 5) history = 5;
412 mode = rrd_memory_mode_id(appconfig_get(&stream_config, key, "default memory mode", rrd_memory_mode_name(mode)));
413 mode = rrd_memory_mode_id(appconfig_get(&stream_config, machine_guid, "memory mode", rrd_memory_mode_name(mode)));
415 health_enabled = appconfig_get_boolean_ondemand(&stream_config, key, "health enabled by default", health_enabled);
416 health_enabled = appconfig_get_boolean_ondemand(&stream_config, machine_guid, "health enabled", health_enabled);
418 alarms_delay = appconfig_get_number(&stream_config, key, "default postpone alarms on connect seconds", alarms_delay);
419 alarms_delay = appconfig_get_number(&stream_config, machine_guid, "postpone alarms on connect seconds", alarms_delay);
421 if(!strcmp(machine_guid, "localhost"))
424 host = rrdhost_find_or_create(hostname, machine_guid, os, update_every, history, mode, health_enabled?1:0);
426 info("STREAM [receive from [%s]:%s]: metrics for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s",
427 client_ip, client_port,
428 hostname, machine_guid,
431 rrd_memory_mode_name(mode),
432 (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
435 struct plugind cd = {
437 .update_every = default_rrd_update_every,
439 .serial_failures = 0,
440 .successful_collections = 0,
442 .started_t = now_realtime_sec(),
446 // put the client IP and port into the buffers used by plugins.d
447 snprintfz(cd.id, CONFIG_MAX_NAME, "%s:%s", client_ip, client_port);
448 snprintfz(cd.filename, FILENAME_MAX, "%s:%s", client_ip, client_port);
449 snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", client_ip, client_port);
450 snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port);
452 info("STREAM [receive from [%s]:%s]: initializing communication...", client_ip, client_port);
453 if(send_timeout(fd, "STREAM", 6, 0, 60) != 6) {
454 error("STREAM [receive from [%s]:%s]: cannot send STREAM command.", client_ip, client_port);
458 // remove the non-blocking flag from the socket
459 if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
460 error("STREAM [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", client_ip, client_port, fd);
462 // convert the socket to a FILE *
463 FILE *fp = fdopen(fd, "r");
465 error("STREAM [receive from [%s]:%s]: failed to get a FILE for FD %d.", client_ip, client_port, fd);
469 rrdhost_wrlock(host);
471 if(health_enabled != CONFIG_BOOLEAN_NO)
472 host->health_delay_up_to = now_realtime_sec() + alarms_delay;
473 rrdhost_unlock(host);
475 // call the plugins.d processor to receive the metrics
476 info("STREAM [receive from [%s]:%s]: receiving metrics... (host '%s', machine GUID '%s').", client_ip, client_port, host->hostname, host->machine_guid);
477 size_t count = pluginsd_process(host, &cd, fp, 1);
478 error("STREAM [receive from [%s]:%s]: disconnected (host '%s', machine GUID '%s', completed updates %zu).", client_ip, client_port, host->hostname, host->machine_guid, count);
480 rrdhost_wrlock(host);
482 if(!host->use_counter && health_enabled == CONFIG_BOOLEAN_AUTO)
483 host->health_enabled = 0;
484 rrdhost_unlock(host);
492 struct rrdpush_thread {
503 void *rrdpush_receiver_thread(void *ptr) {
504 struct rrdpush_thread *rpt = (struct rrdpush_thread *)ptr;
506 if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
507 error("STREAM [receive]: cannot set pthread cancel type to DEFERRED.");
509 if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
510 error("STREAM [receive]: cannot set pthread cancel state to ENABLE.");
513 info("STREAM [%s]:%s: receive thread created (task id %d)", rpt->client_ip, rpt->client_port, gettid());
514 rrdpush_receive(rpt->fd, rpt->key, rpt->hostname, rpt->machine_guid, rpt->os, rpt->update_every, rpt->client_ip, rpt->client_port);
515 info("STREAM [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->client_ip, rpt->client_port, gettid());
519 freez(rpt->hostname);
520 freez(rpt->machine_guid);
522 freez(rpt->client_ip);
523 freez(rpt->client_port);
530 static inline int rrdpush_receive_validate_api_key(const char *key) {
531 return appconfig_get_boolean(&stream_config, key, "enabled", 0);
534 int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) {
537 info("STREAM [receive from [%s]:%s]: new client connection.", w->client_ip, w->client_port);
539 char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = NULL;
540 int update_every = default_rrd_update_every;
543 char *value = mystrsep(&url, "?&");
544 if(!value || !*value) continue;
546 char *name = mystrsep(&value, "=");
547 if(!name || !*name) continue;
548 if(!value || !*value) continue;
550 if(!strcmp(name, "key"))
552 else if(!strcmp(name, "hostname"))
554 else if(!strcmp(name, "machine_guid"))
555 machine_guid = value;
556 else if(!strcmp(name, "update_every"))
557 update_every = (int)strtoul(value, NULL, 0);
558 else if(!strcmp(name, "os"))
563 error("STREAM [receive from [%s]:%s]: request without an API key. Forbidding access.", w->client_ip, w->client_port);
564 buffer_flush(w->response.data);
565 buffer_sprintf(w->response.data, "You need an API key for this request.");
569 if(!hostname || !*hostname) {
570 error("STREAM [receive from [%s]:%s]: request without a hostname. Forbidding access.", w->client_ip, w->client_port);
571 buffer_flush(w->response.data);
572 buffer_sprintf(w->response.data, "You need to send a hostname too.");
576 if(!machine_guid || !*machine_guid) {
577 error("STREAM [receive from [%s]:%s]: request without a machine GUID. Forbidding access.", w->client_ip, w->client_port);
578 buffer_flush(w->response.data);
579 buffer_sprintf(w->response.data, "You need to send a machine GUID too.");
583 if(!rrdpush_receive_validate_api_key(key)) {
584 error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
585 buffer_flush(w->response.data);
586 buffer_sprintf(w->response.data, "Your API key is not permitted access.");
590 if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) {
591 error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
592 buffer_flush(w->response.data);
593 buffer_sprintf(w->response.data, "Your machine guide is not permitted access.");
597 struct rrdpush_thread *rpt = mallocz(sizeof(struct rrdpush_thread));
599 rpt->key = strdupz(key);
600 rpt->hostname = strdupz(hostname);
601 rpt->machine_guid = strdupz(machine_guid);
602 rpt->os = strdupz(os);
603 rpt->client_ip = strdupz(w->client_ip);
604 rpt->client_port = strdupz(w->client_port);
605 rpt->update_every = update_every;
608 debug(D_SYSTEM, "STREAM [receive from [%s]:%s]: starting receiving thread.", w->client_ip, w->client_port);
610 if(pthread_create(&thread, NULL, rrdpush_receiver_thread, (void *)rpt))
611 error("STREAM [receive from [%s]:%s]: failed to create new thread for client.", w->client_ip, w->client_port);
613 else if(pthread_detach(thread))
614 error("STREAM [receive from [%s]:%s]: cannot request detach newly created thread.", w->client_ip, w->client_port);
616 // prevent the caller from closing the streaming socket
618 w->ifd = w->ofd = -1;
622 buffer_flush(w->response.data);