if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
return;
+
+ rrdpush_lock();
+
if(unlikely(!rrdpush_buffer || !rrdpush_connected)) {
if(!error_shown)
- error("PUSH: not ready - discarding collected metrics.");
+ error("STREAM: not ready - discarding collected metrics.");
error_shown = 1;
+
+ rrdpush_unlock();
return;
}
error_shown = 0;
- rrdpush_lock();
- rrdset_rdlock(st);
-
if(st->rrdhost != last_host) {
buffer_sprintf(rrdpush_buffer, "HOST '%s' '%s'\n", st->rrdhost->machine_guid, st->rrdhost->hostname);
last_host = st->rrdhost;
}
+ rrdset_rdlock(st);
if(need_to_send_chart_definition(st))
send_chart_definition(st);
send_chart_metrics(st);
+ rrdset_unlock(st);
// signal the sender there are more data
if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
- error("Cannot write to internal pipe");
+ error("STREAM: cannot write to internal pipe");
- rrdset_unlock(st);
rrdpush_unlock();
}
static inline void rrdpush_flush(void) {
rrdpush_lock();
if(buffer_strlen(rrdpush_buffer))
- error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
+ error("STREAM: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
buffer_flush(rrdpush_buffer);
reset_all_charts();
void *central_netdata_push_thread(void *ptr) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
- info("Central netdata push thread created with task id %d", gettid());
+ info("STREAM: central netdata push thread created with task id %d", gettid());
if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
- error("Cannot set pthread cancel type to DEFERRED.");
+ error("STREAM: cannot set pthread cancel type to DEFERRED.");
if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
- error("Cannot set pthread cancel state to ENABLE.");
+ error("STREAM: cannot set pthread cancel state to ENABLE.");
rrdpush_buffer = buffer_create(1);
if(pipe(rrdpush_pipe) == -1)
- fatal("Cannot create required pipe.");
+ fatal("STREAM: cannot create required pipe.");
struct timeval tv = {
.tv_sec = 60,
int sock = -1;
struct pollfd fds[2], *ifd, *ofd;
+ nfds_t fdmax;
ifd = &fds[0];
ofd = &fds[1];
if(netdata_exit) break;
if(unlikely(sock == -1)) {
+ // stop appending data into rrdpush_buffer
+ // they will be lost, so there is no point to do it
rrdpush_connected = 0;
- info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data);
+ info("STREAM: connecting to central netdata at: %s", central_netdata_to_push_data);
sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
if(unlikely(sock == -1)) {
- error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+ error("STREAM: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+ sleep(5);
continue;
}
- info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
+ info("STREAM: initializing communication to central netdata at: %s", central_netdata_to_push_data);
char http[1000 + 1];
snprintfz(http, 1000, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), program_version);
if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
close(sock);
sock = -1;
- error("PUSH: failed to send http header to netdata at: %s", central_netdata_to_push_data);
+ error("STREAM: failed to send http header to netdata at: %s", central_netdata_to_push_data);
sleep(5);
continue;
}
+ info("STREAM: Waiting for STREAM from central netdata at: %s", central_netdata_to_push_data);
+
if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
close(sock);
sock = -1;
- error("PUSH: failed to receive OK from netdata at: %s", central_netdata_to_push_data);
+ error("STREAM: failed to receive STREAM from netdata at: %s", central_netdata_to_push_data);
sleep(5);
continue;
}
if(strncmp(http, "STREAM", 6)) {
close(sock);
sock = -1;
- error("PUSH: netdata servers at %s, did not send STREAM", central_netdata_to_push_data);
+ error("STREAM: netdata servers at %s, did not send STREAM", central_netdata_to_push_data);
sleep(5);
continue;
}
+ info("STREAM: Established STREAM with central netdata at: %s - sending metrics...", central_netdata_to_push_data);
+
if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
- error("PUSH: cannot set non-blocking mode for socket.");
+ error("STREAM: cannot set non-blocking mode for socket.");
rrdpush_flush();
sent_connection = 0;
+
+ // allow appending data into rrdpush_buffer
rrdpush_connected = 1;
}
ifd->revents = 0;
ofd->fd = sock;
- ofd->events = POLLOUT;
ofd->revents = 0;
-
- nfds_t fdmax = 2;
-
- if(begin < buffer_strlen(rrdpush_buffer))
+ if(begin < buffer_strlen(rrdpush_buffer)) {
ofd->events = POLLOUT;
- else
+ fdmax = 2;
+ }
+ else {
ofd->events = 0;
+ fdmax = 1;
+ }
if(netdata_exit) break;
int retval = poll(fds, fdmax, 60 * 1000);
if(errno == EAGAIN || errno == EINTR)
continue;
- error("PUSH: Failed to poll().");
+ error("STREAM: Failed to poll().");
close(sock);
sock = -1;
break;
if(ifd->revents & POLLIN) {
char buffer[1000 + 1];
if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
- error("PUSH: Cannot read from internal pipe.");
+ error("STREAM: Cannot read from internal pipe.");
}
if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
- // info("PUSH: send buffer is ready, sending %zu bytes starting at %zu", buffer_strlen(rrdpush_buffer) - begin, begin);
+ // info("STREAM: send buffer is ready, sending %zu bytes starting at %zu", buffer_strlen(rrdpush_buffer) - begin, begin);
// fprintf(stderr, "PUSH BEGIN\n");
// fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr);
ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
if(ret == -1) {
if(errno != EAGAIN && errno != EINTR) {
- error("PUSH: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
+ error("STREAM: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
close(sock);
sock = -1;
}
// protection from overflow
if(rrdpush_buffer->len > max_size) {
errno = 0;
- error("PUSH: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
+ error("STREAM: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
if(sock != -1) {
close(sock);
sock = -1;
}
}
- debug(D_WEB_CLIENT, "Central netdata push thread exits.");
+ debug(D_WEB_CLIENT, "STREAM: central netdata push thread exits.");
if(sock != -1) {
close(sock);
}
}
if(!key || !*key) {
- error("STREAM request from client '%s:%s', without an API key. Forbidding access.", w->client_ip, w->client_port);
+ error("STREAM [%s]:%s: request without an API key. Forbidding access.", w->client_ip, w->client_port);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "You need an API key for this request.");
return 401;
}
if(!validate_stream_api_key(key)) {
- error("STREAM request from client '%s:%s': API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
+ error("STREAM [%s]:%s: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "Your API key is not permitted access.");
return 401;
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", w->client_ip, w->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", w->client_ip, w->client_port);
+ info("STREAM [%s]:%s: sending STREAM to initiate streaming...", w->client_ip, w->client_port);
if(send_timeout(w->ifd, "STREAM", 6, 0, 60) != 6) {
- error("Cannot send STREAM to netdata at %s:%s", w->client_ip, w->client_port);
+ error("STREAM [%s]:%s: cannot send STREAM.", w->client_ip, w->client_port);
buffer_flush(w->response.data);
- buffer_sprintf(w->response.data, "STREAM failed to reply back with STREAM");
+ buffer_sprintf(w->response.data, "Failed to reply back with STREAM");
return 400;
}
// remove the non-blocking flag from the socket
if(fcntl(w->ifd, F_SETFL, fcntl(w->ifd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
- error("STREAM from '%s:%s': cannot remove the non-blocking flag from socket %d", w->client_ip, w->client_port, w->ifd);
+ error("STREAM [%s]:%s: cannot remove the non-blocking flag from socket %d", w->client_ip, w->client_port, w->ifd);
/*
char buffer[1000 + 1];
// convert the socket to a FILE *
FILE *fp = fdopen(w->ifd, "r");
if(!fp) {
- error("STREAM from '%s:%s': failed to get a FILE for FD %d.", w->client_ip, w->client_port, w->ifd);
+ error("STREAM [%s]:%s: failed to get a FILE for FD %d.", w->client_ip, w->client_port, w->ifd);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "Failed to get a FILE for an FD.");
return 500;
}
// call the plugins.d processor to receive the metrics
- info("STREAM connecting client '%s:%s' to plugins.d.", w->client_ip, w->client_port);
+ info("STREAM [%s]:%s: connecting client to plugins.d.", w->client_ip, w->client_port);
size_t count = pluginsd_process(host, &cd, fp, 1);
- error("STREAM from '%s:%s': client disconnected.", w->client_ip, w->client_port);
+ error("STREAM [%s]:%s: client disconnected.", w->client_ip, w->client_port);
// close all sockets, to let the socket worker we are done
fclose(fp);