size_t sent_bytes = 0;
size_t sent_connection = 0;
int sock = -1;
- char buffer[1];
+ char buffer[1000 + 1];
+
+ struct pollfd fds[2], *ifd, *ofd;
+
+ ifd = &fds[0];
+ ofd = &fds[1];
+
+ ifd->fd = rrdpush_pipe[PIPE_READ];
+ ifd->events = POLLIN;
+ ofd->events = POLLOUT;
+
+ nfds_t fdmax = 2;
for(;;) {
+ if(netdata_exit) break;
+
if(unlikely(sock == -1)) {
info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data);
sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
- if(unlikely(sock != -1)) {
- info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
+ if(unlikely(sock == -1)) {
+ error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+ sleep(5);
+ continue;
+ }
+
+ info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
- if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
- error("PUSH: cannot set non-blocking mode for socket.");
+ char http[1000 + 1];
+ snprintfz(http, 1000, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), program_version);
+ if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
+ close(sock);
+ sock = -1;
+ error("PUSH: failed to send http header to netdata at: %s", central_netdata_to_push_data);
+ sleep(5);
+ continue;
}
- else
- error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+
+ if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
+ close(sock);
+ sock = -1;
+ error("PUSH: failed to receive OK from netdata at: %s", central_netdata_to_push_data);
+ sleep(5);
+ continue;
+ }
+
+ if(strncmp(http, "STREAM", 6)) {
+ close(sock);
+ sock = -1;
+ error("PUSH: netdata servers at %s, did not send STREAM", central_netdata_to_push_data);
+ sleep(5);
+ continue;
+ }
+
+ if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
+ error("PUSH: cannot set non-blocking mode for socket.");
rrdpush_lock();
if(buffer_strlen(rrdpush_buffer))
error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
buffer_flush(rrdpush_buffer);
- buffer_sprintf(rrdpush_buffer, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), VERSION);
reset_all_charts();
+ last_host = NULL;
rrdpush_unlock();
sent_connection = 0;
}
- if(read(rrdpush_pipe[PIPE_READ], buffer, 1) == -1) {
- error("PUSH: Cannot read from internal pipe.");
- sleep(1);
+ ifd->revents = 0;
+ ofd->revents = 0;
+ ofd->fd = sock;
+
+ if(begin < buffer_strlen(rrdpush_buffer))
+ ofd->events = POLLOUT;
+ else
+ ofd->events = 0;
+
+ if(netdata_exit) break;
+ int retval = poll(fds, fdmax, 60 * 1000);
+ if(netdata_exit) break;
+
+ if(unlikely(retval == -1)) {
+ if(errno == EAGAIN || errno == EINTR)
+ continue;
+
+ error("PUSH: Failed to poll().");
+ close(sock);
+ sock = -1;
+ break;
+ }
+ else if(unlikely(!retval)) {
+ // timeout
+ continue;
+ }
+
+ if(ifd->revents & POLLIN) {
+ if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
+ error("PUSH: Cannot read from internal pipe.");
}
- if(likely(sock != -1 && begin < rrdpush_buffer->len)) {
+ if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
// fprintf(stderr, "PUSH BEGIN\n");
- // fwrite(&rrdpush_buffer->buffer[begin], 1, rrdpush_buffer->len - begin, stderr);
+ // fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr);
// fprintf(stderr, "\nPUSH END\n");
rrdpush_lock();
- ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], rrdpush_buffer->len - begin, MSG_DONTWAIT);
+ ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
if(ret == -1) {
- if(errno != EAGAIN) {
+ if(errno != EAGAIN && errno != EINTR) {
error("PUSH: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
close(sock);
sock = -1;
sent_connection += ret;
sent_bytes += ret;
begin += ret;
- if(begin == rrdpush_buffer->len) {
+ if(begin == buffer_strlen(rrdpush_buffer)) {
buffer_flush(rrdpush_buffer);
begin = 0;
}
return sock;
}
+
+ssize_t recv_timeout(int sockfd, void *buf, size_t len, int flags, int timeout) {
+ for(;;) {
+ struct pollfd fd = {
+ .fd = sockfd,
+ .events = POLLIN,
+ .revents = 0
+ };
+
+ errno = 0;
+ int retval = poll(&fd, 1, timeout * 1000);
+
+ if(retval == -1) {
+ // failed
+
+ if(errno == EINTR || errno == EAGAIN)
+ continue;
+
+ return -1;
+ }
+
+ if(!retval) {
+ // timeout
+ return 0;
+ }
+
+ if(fd.events & POLLIN) break;
+ }
+
+ return recv(sockfd, buf, len, flags);
+}
+
+ssize_t send_timeout(int sockfd, void *buf, size_t len, int flags, int timeout) {
+ for(;;) {
+ struct pollfd fd = {
+ .fd = sockfd,
+ .events = POLLOUT,
+ .revents = 0
+ };
+
+ errno = 0;
+ int retval = poll(&fd, 1, timeout * 1000);
+
+ if(retval == -1) {
+ // failed
+
+ if(errno == EINTR || errno == EAGAIN)
+ continue;
+
+ return -1;
+ }
+
+ if(!retval) {
+ // timeout
+ return 0;
+ }
+
+ if(fd.events & POLLOUT) break;
+ }
+
+ return send(sockfd, buf, len, flags);
+}
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", w->client_ip, w->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", w->client_ip, w->client_port);
+ if(send_timeout(w->ifd, "STREAM", 6, 0, 60) != 6) {
+ error("Cannot send STREAM to netdata at %s:%s", w->client_ip, w->client_port);
+ buffer_flush(w->response.data);
+ buffer_sprintf(w->response.data, "STREAM failed to reply back with STREAM");
+ return 400;
+ }
+
// remove the non-blocking flag from the socket
if(fcntl(w->ifd, F_SETFL, fcntl(w->ifd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
error("STREAM from '%s:%s': cannot remove the non-blocking flag from socket %d", w->client_ip, w->client_port, w->ifd);
+ /*
+ char buffer[1000 + 1];
+ ssize_t len;
+ while((len = read(w->ifd, buffer, 1000)) != -1) {
+ buffer[len] = '\0';
+ fprintf(stderr, "BEGIN READ %zu bytes\n%s\nEND READ\n", (size_t)len, buffer);
+ }
+ */
+
// convert the socket to a FILE *
FILE *fp = fdopen(w->ifd, "r");
if(!fp) {