]> arthur.barton.de Git - netdata.git/blobdiff - src/rrdset.c
properly initialize stream receiving thread
[netdata.git] / src / rrdset.c
index 5efc1ce3a116773a1a782910818e5d7d75d7b78a..f88ce81804b1ceec2536ec36febb6419323fb6d0 100644 (file)
@@ -196,7 +196,7 @@ void rrdset_reset(RRDSET *st) {
 // RRDSET - helpers for rrdset_create()
 
 inline long align_entries_to_pagesize(long entries) {
-    if(central_netdata_to_push_data)
+    if(rrdpush_exclusive)
         return entries;
 
     if(entries < 5) entries = 5;
@@ -495,16 +495,23 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
 // ----------------------------------------------------------------------------
 // RRDSET - data collection iteration control
 
-void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) {
-    if(unlikely(!st->last_collected_time.tv_sec || !microseconds)) {
+inline void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) {
+
+    if(unlikely(!st->last_collected_time.tv_sec)) {
         // the first entry
         microseconds = st->update_every * USEC_PER_SEC;
     }
+    else if(unlikely(!microseconds)) {
+        // no dt given by the plugin
+        struct timeval now;
+        now_realtime_timeval(&now);
+        microseconds = dt_usec(&now, &st->last_collected_time);
+    }
+
     st->usec_since_last_update = microseconds;
 }
 
-void rrdset_next_usec(RRDSET *st, usec_t microseconds)
-{
+inline void rrdset_next_usec(RRDSET *st, usec_t microseconds) {
     struct timeval now;
     now_realtime_timeval(&now);
 
@@ -522,26 +529,40 @@ void rrdset_next_usec(RRDSET *st, usec_t microseconds)
 //        usec_t now_usec = timeval_usec(&now);
 //        usec_t last_usec = timeval_usec(&st->last_collected_time);
 //#endif
-        usec_t since_last_usec = dt_usec(&now, &st->last_collected_time);
+        susec_t since_last_usec = dt_usec_signed(&now, &st->last_collected_time);
+
+        if(unlikely(since_last_usec < 0)) {
+            // oops! the database is in the future
+            error("Database for chart '%s' on host '%s' is %lld microseconds in the future.", st->id, st->rrdhost->hostname, -since_last_usec);
+            memcpy(&st->last_collected_time, &now, sizeof(struct timeval));
+            st->last_collected_time.tv_sec -= st->update_every;
+
+            memcpy(&st->last_updated, &now, sizeof(struct timeval));
+            timeval_align(&st->last_updated, st->update_every);
+            st->last_updated.tv_sec -= st->update_every;
+
+            microseconds = st->update_every * USEC_PER_SEC;
+            since_last_usec = st->update_every * USEC_PER_SEC;
+        }
 
         // verify the microseconds given is good
-        if(unlikely(microseconds > since_last_usec)) {
-            debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id);
+        if(unlikely(microseconds > (usec_t)since_last_usec)) {
+            debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - (usec_t)since_last_usec, st->name, st->id);
 
 //#ifdef NETDATA_INTERNAL_CHECKS
 //            if(unlikely(last_usec + microseconds > now_usec + 1000))
-//                error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id);
+//                error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - (usec_t)since_last_usec, st->name, st->id);
 //#endif
 
-            microseconds = since_last_usec;
+            microseconds = (usec_t)since_last_usec;
         }
-        else if(unlikely(microseconds < since_last_usec * 0.8)) {
-            debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id);
+        else if(unlikely(microseconds < (usec_t)since_last_usec * 0.8)) {
+            debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, (usec_t)since_last_usec, st->name, st->id);
 
 //#ifdef NETDATA_INTERNAL_CHECKS
-//            error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id);
+//            error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, (usec_t)since_last_usec, st->name, st->id);
 //#endif
-            microseconds = since_last_usec;
+            microseconds = (usec_t)since_last_usec;
         }
     }
     debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds);
@@ -556,10 +577,49 @@ void rrdset_next_usec(RRDSET *st, usec_t microseconds)
 // ----------------------------------------------------------------------------
 // RRDSET - process the collected values for all dimensions of a chart
 
+static inline void rrdset_init_last_collected_time(RRDSET *st) {
+    now_realtime_timeval(&st->last_collected_time);
+    timeval_align(&st->last_collected_time, st->update_every);
+}
+
+static inline usec_t rrdset_update_last_collected_time(RRDSET *st) {
+    usec_t last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
+    usec_t ut = last_collect_ut + st->usec_since_last_update;
+    st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC);
+    st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
+    return last_collect_ut;
+}
+
+static inline void rrdset_init_last_updated_time(RRDSET *st) {
+    // copy the last collected time to last updated time
+    memcpy(&st->last_updated, &st->last_collected_time, sizeof(struct timeval));
+    timeval_align(&st->last_updated, st->update_every);
+    st->last_updated.tv_usec = 0;
+}
+
+static inline void rrdset_done_push_int(RRDSET *st) {
+    if(unlikely(!st->last_collected_time.tv_sec)) {
+        // it is the first entry
+        // set the last_collected_time to now
+        rrdset_init_last_collected_time(st);
+    }
+    else {
+        // it is not the first entry
+        // calculate the proper last_collected_time, using usec_since_last_update
+        rrdset_update_last_collected_time(st);
+    }
+
+    st->counter++;
+    st->counter_done++;
+
+    rrdset_done_push(st);
+}
+
 void rrdset_done(RRDSET *st) {
     if(unlikely(netdata_exit)) return;
-    if(unlikely(central_netdata_to_push_data)) {
-        rrdset_done_push(st);
+
+    if(unlikely(rrdpush_exclusive)) {
+        rrdset_done_push_int(st);
         return;
     }
 
@@ -611,8 +671,7 @@ void rrdset_done(RRDSET *st) {
     if(unlikely(!st->last_collected_time.tv_sec)) {
         // it is the first entry
         // set the last_collected_time to now
-        now_realtime_timeval(&st->last_collected_time);
-        timeval_align(&st->last_collected_time, st->update_every);
+        rrdset_init_last_collected_time(st);
 
         last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - update_every_ut;
 
@@ -626,10 +685,7 @@ void rrdset_done(RRDSET *st) {
     else {
         // it is not the first entry
         // calculate the proper last_collected_time, using usec_since_last_update
-        last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
-        usec_t ut = last_collect_ut + st->usec_since_last_update;
-        st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC);
-        st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
+        last_collect_ut = rrdset_update_last_collected_time(st);
     }
 
     // if this set has not been updated in the past
@@ -637,32 +693,24 @@ void rrdset_done(RRDSET *st) {
     if(unlikely(!st->last_updated.tv_sec)) {
         // it has never been updated before
         // set a fake last_updated, in the past using usec_since_last_update
-        usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update;
-        st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC);
-        st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
+        rrdset_init_last_updated_time(st);
 
         // the first entry should not be stored
         store_this_entry = 0;
         first_entry = 1;
 
         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
-            debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
+            debug(D_RRD_STATS, "%s: initializing last_updated to last_collected_time - %llu microseconds. Will not store the next entry.", st->name, st->usec_since_last_update);
     }
 
     // check if we will re-write the entire data set
     if(unlikely(dt_usec(&st->last_collected_time, &st->last_updated) > st->entries * update_every_ut)) {
         info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Resetting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
         rrdset_reset(st);
+        rrdset_init_last_updated_time(st);
 
         st->usec_since_last_update = update_every_ut;
 
-        now_realtime_timeval(&st->last_collected_time);
-        timeval_align(&st->last_collected_time, st->update_every);
-
-        usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update;
-        st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC);
-        st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
-
         // the first entry should not be stored
         store_this_entry = 0;
         first_entry = 1;
@@ -1159,5 +1207,8 @@ void rrdset_done(RRDSET *st) {
 
     if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
         error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
+
+    if(unlikely(rrdpush_enabled))
+        rrdset_done_push_int(st);
 }