}
void *backends_main(void *ptr) {
- (void)ptr;
+ struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
BUFFER *b = buffer_create(1), *response = buffer_create(1);
int (*backend_request_formatter)(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) = NULL;
info("BACKEND configured ('%s' on '%s' sending '%s' data, every %d seconds, as host '%s', with prefix '%s')", type, destination, source, frequency, hostname, prefix);
usec_t step_ut = frequency * USEC_PER_SEC;
- usec_t random_ut = now_realtime_usec() % (step_ut / 2);
- time_t before = (time_t)((now_realtime_usec() - step_ut) / USEC_PER_SEC);
- time_t after = before;
+ time_t after = now_realtime_sec();
int failures = 0;
+ heartbeat_t hb;
+ heartbeat_init(&hb);
for(;;) {
// ------------------------------------------------------------------------
- // wait for the next iteration point
-
- usec_t now_ut = now_realtime_usec();
- usec_t next_ut = now_ut - (now_ut % step_ut) + step_ut;
- before = (time_t)(next_ut / USEC_PER_SEC);
-
- // add a little delay (1/4 of the step) plus some randomness
- next_ut += (step_ut / 4) + random_ut;
-
- while(now_ut < next_ut) {
- sleep_usec(next_ut - now_ut);
- now_ut = now_realtime_usec();
- }
+ // Wait for the next iteration point.
+ heartbeat_next(&hb, step_ut);
+ time_t before = now_realtime_sec();
// ------------------------------------------------------------------------
// add to the buffer the data we need to send to the backend
-
RRDSET *st;
int pthreadoldcancelstate;
//fprintf(stderr, "\nBACKEND BEGIN:\n%s\nBACKEND END\n", buffer_tostring(b)); // FIXME
//fprintf(stderr, "after = %lu, before = %lu\n", after, before);
+ // prepare for the next iteration
+ // to add incrementally data to buffer
+ after = before;
+
// ------------------------------------------------------------------------
// if we are connected, receive a response, without blocking
// if we are not connected, connect to a backend server
if(unlikely(sock == -1)) {
- usec_t start_ut = now_realtime_usec();
+ usec_t start_ut = now_monotonic_usec();
const char *s = destination;
while(*s) {
const char *e = s;
if(sock != -1) break;
s = e;
}
- chart_backend_latency += now_realtime_usec() - start_ut;
+ chart_backend_latency += now_monotonic_usec() - start_ut;
}
if(unlikely(netdata_exit)) break;
if(likely(sock != -1)) {
size_t len = buffer_strlen(b);
- usec_t start_ut = now_realtime_usec();
+ usec_t start_ut = now_monotonic_usec();
int flags = 0;
#ifdef MSG_NOSIGNAL
flags += MSG_NOSIGNAL;
#endif
ssize_t written = send(sock, buffer_tostring(b), len, flags);
- chart_backend_latency += now_realtime_usec() - start_ut;
+ chart_backend_latency += now_monotonic_usec() - start_ut;
if(written != -1 && (size_t)written == len) {
// we sent the data successfully
chart_transmission_successes++;
close(sock);
sock = -1;
}
-
- // either the buffer is empty
- // or is holding the data we couldn't send
- // so, make sure the next iteration will continue
- // from where we are now
- after = before;
}
else {
error("Failed to update database backend '%s'", destination);
info("BACKEND thread exiting");
+ static_thread->enabled = 0;
pthread_exit(NULL);
return NULL;
}