}
void *backends_main(void *ptr) {
- (void)ptr;
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
BUFFER *b = buffer_create(1), *response = buffer_create(1);
int (*backend_request_formatter)(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) = NULL;
rrddim_add(chart_ops, "read", NULL, 1, 1, RRDDIM_ABSOLUTE);
}
+ /*
+ * this is misleading - we can only measure the time we need to send data
+ * this time is not related to the time required for the data to travel to
+ * the backend database and the time that server needed to process them
+ *
+ * issue #1432 and https://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html
+ *
RRDSET *chart_latency = rrdset_find("netdata.backend_latency");
if(!chart_latency) {
chart_latency = rrdset_create("netdata", "backend_latency", NULL, "backend", NULL, "Netdata Backend Latency", "ms", 130620, frequency, RRDSET_TYPE_AREA);
rrddim_add(chart_latency, "latency", NULL, 1, 1000, RRDDIM_ABSOLUTE);
}
+ */
RRDSET *chart_rusage = rrdset_find("netdata.backend_thread_cpu");
if(!chart_rusage) {
info("BACKEND configured ('%s' on '%s' sending '%s' data, every %d seconds, as host '%s', with prefix '%s')", type, destination, source, frequency, hostname, prefix);
usec_t step_ut = frequency * USEC_PER_SEC;
- usec_t random_ut = now_realtime_usec() % (step_ut / 2);
- time_t before = (time_t)((now_realtime_usec() - step_ut) / USEC_PER_SEC);
- time_t after = before;
+ time_t after = now_realtime_sec();
int failures = 0;
+ heartbeat_t hb;
+ heartbeat_init(&hb);
for(;;) {
// ------------------------------------------------------------------------
- // wait for the next iteration point
-
- usec_t now_ut = now_realtime_usec();
- usec_t next_ut = now_ut - (now_ut % step_ut) + step_ut;
- before = (time_t)(next_ut / USEC_PER_SEC);
-
- // add a little delay (1/4 of the step) plus some randomness
- next_ut += (step_ut / 4) + random_ut;
-
- while(now_ut < next_ut) {
- sleep_usec(next_ut - now_ut);
- now_ut = now_realtime_usec();
- }
+ // Wait for the next iteration point.
+ heartbeat_next(&hb, step_ut);
+ time_t before = now_realtime_sec();
// ------------------------------------------------------------------------
// add to the buffer the data we need to send to the backend
-
RRDSET *st;
int pthreadoldcancelstate;
//fprintf(stderr, "\nBACKEND BEGIN:\n%s\nBACKEND END\n", buffer_tostring(b)); // FIXME
//fprintf(stderr, "after = %lu, before = %lu\n", after, before);
+ // prepare for the next iteration
+ // to add incrementally data to buffer
+ after = before;
+
// ------------------------------------------------------------------------
// if we are connected, receive a response, without blocking
if(likely(sock != -1)) {
- if(likely(fcntl(sock, F_SETFL, O_NONBLOCK) >= 0)) {
- errno = 0;
-
- // loop through to collect all data
- while(sock != -1 && errno != EWOULDBLOCK) {
- buffer_need_bytes(response, 4096);
-
- ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, O_NONBLOCK);
- if(likely(r > 0)) {
- // we received some data
- response->len += r;
- chart_received_bytes += r;
- chart_receptions++;
- }
- else if(r == 0) {
- error("Backend '%s' closed the socket", destination);
- close(sock);
- sock = -1;
- }
- else {
- // failed to receive data
- if(errno != EAGAIN && errno != EWOULDBLOCK) {
- error("Cannot receive data from backend '%s'.", destination);
- }
- }
+ errno = 0;
+
+ // loop through to collect all data
+ while(sock != -1 && errno != EWOULDBLOCK) {
+ buffer_need_bytes(response, 4096);
+
+ ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
+ if(likely(r > 0)) {
+ // we received some data
+ response->len += r;
+ chart_received_bytes += r;
+ chart_receptions++;
}
-
- if(sock != -1) {
- // unset O_NONBLOCK
- int val = fcntl(sock, F_GETFL, 0);
- if(likely(val >= 0)) {
- int flags = O_NONBLOCK;
- val &= ~flags;
- if(unlikely(fcntl(sock, F_SETFL, val) < 0))
- error("Cannot unset O_NONBLOCK from backend's '%s' socket.", destination);
+ else if(r == 0) {
+ error("Backend '%s' closed the socket", destination);
+ close(sock);
+ sock = -1;
+ }
+ else {
+ // failed to receive data
+ if(errno != EAGAIN && errno != EWOULDBLOCK) {
+ error("Cannot receive data from backend '%s'.", destination);
}
- else
- error("Cannot get active flags from backend's '%s' socket.", destination);
}
-
- // if we received data, process them
- if(buffer_strlen(response))
- backend_response_checker(response);
- }
- else {
- error("Cannot set O_NONBLOCK from backend's '%s' socket. Receiving data from backend skipped.", destination);
}
+
+ // if we received data, process them
+ if(buffer_strlen(response))
+ backend_response_checker(response);
}
// ------------------------------------------------------------------------
// if we are not connected, connect to a backend server
if(unlikely(sock == -1)) {
- usec_t start_ut = now_realtime_usec();
+ usec_t start_ut = now_monotonic_usec();
const char *s = destination;
while(*s) {
const char *e = s;
if(sock != -1) break;
s = e;
}
- chart_backend_latency += now_realtime_usec() - start_ut;
+ chart_backend_latency += now_monotonic_usec() - start_ut;
}
if(unlikely(netdata_exit)) break;
if(likely(sock != -1)) {
size_t len = buffer_strlen(b);
- usec_t start_ut = now_realtime_usec();
+ usec_t start_ut = now_monotonic_usec();
int flags = 0;
#ifdef MSG_NOSIGNAL
flags += MSG_NOSIGNAL;
#endif
ssize_t written = send(sock, buffer_tostring(b), len, flags);
- chart_backend_latency += now_realtime_usec() - start_ut;
+ chart_backend_latency += now_monotonic_usec() - start_ut;
if(written != -1 && (size_t)written == len) {
// we sent the data successfully
chart_transmission_successes++;
close(sock);
sock = -1;
}
-
- // either the buffer is empty
- // or is holding the data we couldn't send
- // so, make sure the next iteration will continue
- // from where we are now
- after = before;
}
else {
error("Failed to update database backend '%s'", destination);
rrddim_set(chart_bytes, "received", chart_received_bytes);
rrdset_done(chart_bytes);
+ /*
if(chart_latency->counter_done) rrdset_next(chart_latency);
rrddim_set(chart_latency, "latency", chart_backend_latency);
rrdset_done(chart_latency);
+ */
getrusage(RUSAGE_THREAD, &thread);
if(chart_rusage->counter_done) rrdset_next(chart_rusage);
info("BACKEND thread exiting");
+ static_thread->enabled = 0;
pthread_exit(NULL);
return NULL;
}