]> arthur.barton.de Git - netdata.git/blobdiff - src/rrdset.c
fix numa plugin to work only on the expected values; fixes #1868
[netdata.git] / src / rrdset.c
index 5efc1ce3a116773a1a782910818e5d7d75d7b78a..13d6014e8abf5af92c54a6dd479c29551162ec1a 100644 (file)
@@ -187,7 +187,7 @@ void rrdset_reset(RRDSET *st) {
     rrddim_foreach_read(rd, st) {
         rd->last_collected_time.tv_sec = 0;
         rd->last_collected_time.tv_usec = 0;
-        rd->counter = 0;
+        rd->collections_counter = 0;
         memset(rd->values, 0, rd->entries * sizeof(storage_number));
     }
 }
@@ -195,16 +195,16 @@ void rrdset_reset(RRDSET *st) {
 // ----------------------------------------------------------------------------
 // RRDSET - helpers for rrdset_create()
 
-inline long align_entries_to_pagesize(long entries) {
-    if(central_netdata_to_push_data)
-        return entries;
+inline long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries) {
+    if(unlikely(entries < 5)) entries = 5;
+    if(unlikely(entries > RRD_HISTORY_ENTRIES_MAX)) entries = RRD_HISTORY_ENTRIES_MAX;
 
-    if(entries < 5) entries = 5;
-    if(entries > RRD_HISTORY_ENTRIES_MAX) entries = RRD_HISTORY_ENTRIES_MAX;
+    if(unlikely(mode == RRD_MEMORY_MODE_NONE || mode == RRD_MEMORY_MODE_RAM))
+        return entries;
 
     long page = (size_t)sysconf(_SC_PAGESIZE);
     long size = sizeof(RRDDIM) + entries * sizeof(storage_number);
-    if(size % page) {
+    if(unlikely(size % page)) {
         size -= (size % page);
         size += page;
 
@@ -215,11 +215,16 @@ inline long align_entries_to_pagesize(long entries) {
     return entries;
 }
 
-static inline void timeval_align(struct timeval *tv, int update_every) {
+static inline void last_collected_time_align(struct timeval *tv, int update_every) {
     tv->tv_sec -= tv->tv_sec % update_every;
     tv->tv_usec = 500000;
 }
 
+static inline void last_updated_time_align(struct timeval *tv, int update_every) {
+    tv->tv_sec -= tv->tv_sec % update_every;
+    tv->tv_usec = 0;
+}
+
 // ----------------------------------------------------------------------------
 // RRDSET - free a chart
 
@@ -322,22 +327,27 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
     // get the options from the config, we need to create it
 
     long rentries = config_get_number(config_section, "history", host->rrd_history_entries);
-    long entries = align_entries_to_pagesize(rentries);
+    long entries = align_entries_to_pagesize(host->rrd_memory_mode, rentries);
     if(entries != rentries) entries = config_set_number(config_section, "history", entries);
 
+    if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries)
+        entries = config_set_number(config_section, "history", 10);
+
     int enabled = config_get_boolean(config_section, "enabled", 1);
     if(!enabled) entries = 5;
 
     unsigned long size = sizeof(RRDSET);
     char *cache_dir = rrdset_cache_dir(host, fullid, config_section);
 
+    time_t now = now_realtime_sec();
+
     // ------------------------------------------------------------------------
     // load it or allocate it
 
     debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id);
 
     snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir);
-    if(host->rrd_memory_mode != RRD_MEMORY_MODE_RAM) {
+    if(host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || host->rrd_memory_mode == RRD_MEMORY_MODE_MAP) {
         st = (RRDSET *) mymmap(fullfilename, size, ((host->rrd_memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE), 0);
         if(st) {
             memset(&st->avl, 0, sizeof(avl));
@@ -380,15 +390,21 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
                 error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
                 memset(st, 0, size);
             }
-            else if((now_realtime_sec() - st->last_updated.tv_sec) > update_every * entries) {
+            else if((now - st->last_updated.tv_sec) > update_every * entries) {
                 errno = 0;
                 error("File %s is too old. Clearing it.", fullfilename);
                 memset(st, 0, size);
             }
+            else if(st->last_updated.tv_sec > now + update_every) {
+                errno = 0;
+                error("File %s refers to the future. Clearing it.", fullfilename);
+                memset(st, 0, size);
+            }
 
             // make sure the database is aligned
             if(st->last_updated.tv_sec)
-                timeval_align(&st->last_updated, update_every);
+                last_updated_time_align(&st->last_updated, update_every);
+
 
             // make sure we have the right memory mode
             // even if we cleared the memory
@@ -398,7 +414,7 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
 
     if(unlikely(!st)) {
         st = callocz(1, size);
-        st->rrd_memory_mode = RRD_MEMORY_MODE_RAM;
+        st->rrd_memory_mode = (host->rrd_memory_mode == RRD_MEMORY_MODE_NONE) ? RRD_MEMORY_MODE_NONE : RRD_MEMORY_MODE_RAM;
     }
 
     st->config_section = strdup(config_section);
@@ -495,16 +511,23 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
 // ----------------------------------------------------------------------------
 // RRDSET - data collection iteration control
 
-void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) {
-    if(unlikely(!st->last_collected_time.tv_sec || !microseconds)) {
+inline void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) {
+
+    if(unlikely(!st->last_collected_time.tv_sec)) {
         // the first entry
         microseconds = st->update_every * USEC_PER_SEC;
     }
+    else if(unlikely(!microseconds)) {
+        // no dt given by the plugin
+        struct timeval now;
+        now_realtime_timeval(&now);
+        microseconds = dt_usec(&now, &st->last_collected_time);
+    }
+
     st->usec_since_last_update = microseconds;
 }
 
-void rrdset_next_usec(RRDSET *st, usec_t microseconds)
-{
+inline void rrdset_next_usec(RRDSET *st, usec_t microseconds) {
     struct timeval now;
     now_realtime_timeval(&now);
 
@@ -522,26 +545,42 @@ void rrdset_next_usec(RRDSET *st, usec_t microseconds)
 //        usec_t now_usec = timeval_usec(&now);
 //        usec_t last_usec = timeval_usec(&st->last_collected_time);
 //#endif
-        usec_t since_last_usec = dt_usec(&now, &st->last_collected_time);
+        susec_t since_last_usec = dt_usec_signed(&now, &st->last_collected_time);
+
+        if(unlikely(since_last_usec < 0)) {
+            // oops! the database is in the future
+            error("Database for chart '%s' on host '%s' is %lld microseconds in the future. Adjusting it to current time.", st->id, st->rrdhost->hostname, -since_last_usec);
+
+            st->last_collected_time.tv_sec  = now.tv_sec - st->update_every;
+            st->last_collected_time.tv_usec = now.tv_usec;
+            last_collected_time_align(&st->last_collected_time, st->update_every);
+
+            st->last_updated.tv_sec  = now.tv_sec - st->update_every;
+            st->last_updated.tv_usec = now.tv_usec;
+            last_updated_time_align(&st->last_updated, st->update_every);
+
+            microseconds    = st->update_every * USEC_PER_SEC;
+            since_last_usec = st->update_every * USEC_PER_SEC;
+        }
 
         // verify the microseconds given is good
-        if(unlikely(microseconds > since_last_usec)) {
-            debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id);
+        if(unlikely(microseconds > (usec_t)since_last_usec)) {
+            debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - (usec_t)since_last_usec, st->name, st->id);
 
 //#ifdef NETDATA_INTERNAL_CHECKS
 //            if(unlikely(last_usec + microseconds > now_usec + 1000))
-//                error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id);
+//                error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - (usec_t)since_last_usec, st->name, st->id);
 //#endif
 
-            microseconds = since_last_usec;
+            microseconds = (usec_t)since_last_usec;
         }
-        else if(unlikely(microseconds < since_last_usec * 0.8)) {
-            debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id);
+        else if(unlikely(microseconds < (usec_t)since_last_usec * 0.8)) {
+            debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, (usec_t)since_last_usec, st->name, st->id);
 
 //#ifdef NETDATA_INTERNAL_CHECKS
-//            error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id);
+//            error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, (usec_t)since_last_usec, st->name, st->id);
 //#endif
-            microseconds = since_last_usec;
+            microseconds = (usec_t)since_last_usec;
         }
     }
     debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds);
@@ -556,10 +595,52 @@ void rrdset_next_usec(RRDSET *st, usec_t microseconds)
 // ----------------------------------------------------------------------------
 // RRDSET - process the collected values for all dimensions of a chart
 
+static inline void rrdset_init_last_collected_time(RRDSET *st) {
+    now_realtime_timeval(&st->last_collected_time);
+    last_collected_time_align(&st->last_collected_time, st->update_every);
+}
+
+static inline usec_t rrdset_update_last_collected_time(RRDSET *st) {
+    usec_t last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
+    usec_t ut = last_collect_ut + st->usec_since_last_update;
+    st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC);
+    st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
+    return last_collect_ut;
+}
+
+static inline void rrdset_init_last_updated_time(RRDSET *st) {
+    // copy the last collected time to last updated time
+    st->last_updated.tv_sec  = st->last_collected_time.tv_sec;
+    st->last_updated.tv_usec = st->last_collected_time.tv_usec;
+    last_updated_time_align(&st->last_updated, st->update_every);
+}
+
+static inline void rrdset_done_push_exclusive(RRDSET *st) {
+    if(unlikely(!st->last_collected_time.tv_sec)) {
+        // it is the first entry
+        // set the last_collected_time to now
+        rrdset_init_last_collected_time(st);
+    }
+    else {
+        // it is not the first entry
+        // calculate the proper last_collected_time, using usec_since_last_update
+        rrdset_update_last_collected_time(st);
+    }
+
+    st->counter_done++;
+
+    rrdset_rdlock(st);
+    rrdset_done_push(st);
+    rrdset_unlock(st);
+}
+
 void rrdset_done(RRDSET *st) {
     if(unlikely(netdata_exit)) return;
-    if(unlikely(central_netdata_to_push_data)) {
-        rrdset_done_push(st);
+
+    if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE)) {
+        if(unlikely(st->rrdhost->rrdpush_enabled))
+            rrdset_done_push_exclusive(st);
+
         return;
     }
 
@@ -611,8 +692,7 @@ void rrdset_done(RRDSET *st) {
     if(unlikely(!st->last_collected_time.tv_sec)) {
         // it is the first entry
         // set the last_collected_time to now
-        now_realtime_timeval(&st->last_collected_time);
-        timeval_align(&st->last_collected_time, st->update_every);
+        rrdset_init_last_collected_time(st);
 
         last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - update_every_ut;
 
@@ -626,10 +706,7 @@ void rrdset_done(RRDSET *st) {
     else {
         // it is not the first entry
         // calculate the proper last_collected_time, using usec_since_last_update
-        last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
-        usec_t ut = last_collect_ut + st->usec_since_last_update;
-        st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC);
-        st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
+        last_collect_ut = rrdset_update_last_collected_time(st);
     }
 
     // if this set has not been updated in the past
@@ -637,32 +714,24 @@ void rrdset_done(RRDSET *st) {
     if(unlikely(!st->last_updated.tv_sec)) {
         // it has never been updated before
         // set a fake last_updated, in the past using usec_since_last_update
-        usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update;
-        st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC);
-        st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
+        rrdset_init_last_updated_time(st);
 
         // the first entry should not be stored
         store_this_entry = 0;
         first_entry = 1;
 
         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
-            debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
+            debug(D_RRD_STATS, "%s: initializing last_updated to last_collected_time - %llu microseconds. Will not store the next entry.", st->name, st->usec_since_last_update);
     }
 
     // check if we will re-write the entire data set
     if(unlikely(dt_usec(&st->last_collected_time, &st->last_updated) > st->entries * update_every_ut)) {
         info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Resetting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
         rrdset_reset(st);
+        rrdset_init_last_updated_time(st);
 
         st->usec_since_last_update = update_every_ut;
 
-        now_realtime_timeval(&st->last_collected_time);
-        timeval_align(&st->last_collected_time, st->update_every);
-
-        usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update;
-        st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC);
-        st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
-
         // the first entry should not be stored
         store_this_entry = 0;
         first_entry = 1;
@@ -690,12 +759,15 @@ void rrdset_done(RRDSET *st) {
     }
     st->counter_done++;
 
+    if(unlikely(st->rrdhost->rrdpush_enabled))
+        rrdset_done_push(st);
+
     // calculate totals and count the dimensions
     int dimensions = 0;
     st->collected_total = 0;
     rrddim_foreach_read(rd, st) {
         dimensions++;
-        if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
+        if(likely(rd->updated))
             st->collected_total += rd->collected_value;
     }
 
@@ -706,7 +778,7 @@ void rrdset_done(RRDSET *st) {
     // at this stage we do not interpolate anything
     rrddim_foreach_read(rd, st) {
 
-        if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))) {
+        if(unlikely(!rd->updated)) {
             rd->calculated_value = 0;
             continue;
         }
@@ -768,7 +840,7 @@ void rrdset_done(RRDSET *st) {
                 break;
 
             case RRD_ALGORITHM_INCREMENTAL:
-                if(unlikely(rd->counter <= 1)) {
+                if(unlikely(rd->collections_counter <= 1)) {
                     rd->calculated_value = 0;
                     continue;
                 }
@@ -808,7 +880,7 @@ void rrdset_done(RRDSET *st) {
                 break;
 
             case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL:
-                if(unlikely(rd->counter <= 1)) {
+                if(unlikely(rd->collections_counter <= 1)) {
                     rd->calculated_value = 0;
                     continue;
                 }
@@ -992,7 +1064,7 @@ void rrdset_done(RRDSET *st) {
                 continue;
             }
 
-            if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rd->counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
+            if(likely(rd->updated && rd->collections_counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
                 rd->values[st->current_entry] = pack_storage_number(new_value, storage_flags );
                 rd->last_stored_value = new_value;
 
@@ -1055,7 +1127,7 @@ void rrdset_done(RRDSET *st) {
     st->last_collected_total  = st->collected_total;
 
     rrddim_foreach_read(rd, st) {
-        if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
+        if(unlikely(!rd->updated))
             continue;
 
         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
@@ -1087,7 +1159,7 @@ void rrdset_done(RRDSET *st) {
 
         rd->calculated_value = 0;
         rd->collected_value = 0;
-        rrddim_flag_clear(rd, RRDDIM_FLAG_UPDATED);
+        rd->updated = 0;
 
         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
             debug(D_RRD_STATS, "%s/%s: END "
@@ -1160,4 +1232,3 @@ void rrdset_done(RRDSET *st) {
     if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
         error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
 }
-