]> arthur.barton.de Git - netdata.git/commitdiff
new mechanism for two stage processing of web output data - still in progress
authorCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Sat, 24 Oct 2015 22:46:42 +0000 (01:46 +0300)
committerCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Sat, 24 Oct 2015 22:46:42 +0000 (01:46 +0300)
src/Makefile
src/rrd.c
src/rrd.h
src/rrd2json.c

index a097b0d09ebde89312d0663a245d9b8f8664085d..8c5bbba3278912704c5f3a80df9fbdd34c606992 100755 (executable)
@@ -29,7 +29,7 @@ endif
 COMMON_FLAGS = BIN_DIR='$(BIN_DIR)' CONFIG_DIR='$(CONFIG_DIR)' LOG_DIR='$(LOG_DIR)' PLUGINS_DIR='$(PLUGINS_DIR)'
 
 proc_sources = proc_net_dev.c proc_net_ip_vs_stats.c proc_diskstats.c proc_meminfo.c proc_net_netstat.c proc_net_snmp.c proc_net_stat_conntrack.c proc_stat.c proc_vmstat.c proc_net_rpc_nfsd.c proc_sys_kernel_random_entropy_avail.c proc_interrupts.c
-sources = avl.c dictionary.c procfile.c common.c log.c popen.c url.c config.c web_buffer.c storage_number.c web_client.c global_statistics.c rrd.c rrd2json.c web_server.c plugins_d.c daemon.c plugin_tc.c plugin_checks.c plugin_idlejitter.c plugin_proc.c unit_test.c main.c
+sources = rrd.c rrd2json.c avl.c dictionary.c procfile.c common.c log.c popen.c url.c config.c web_buffer.c storage_number.c web_client.c global_statistics.c web_server.c plugins_d.c daemon.c plugin_tc.c plugin_checks.c plugin_idlejitter.c plugin_proc.c unit_test.c main.c
 libs    = -pthread
 
 ifdef debug
index 2a4cd836b8b3cab605a9daf779a207f2d564a36d..20b958f464c5b2ce1037c6d6109ea952a8645f68 100755 (executable)
--- a/src/rrd.c
+++ b/src/rrd.c
@@ -1141,15 +1141,8 @@ unsigned long long rrdset_done(RRDSET *st)
                // reset the storage flags for the next point, if any;
                storage_flags = SN_EXISTS;
 
-               if(unlikely(st->first_entry_t && st->counter >= (unsigned long long)st->entries)) {
-                       // the db is overwriting values
-                       // add the value we will overwrite
-                       st->first_entry_t += st->update_every * 1000000ULL;
-               }
-               
                st->counter++;
                st->current_entry = ((st->current_entry + 1) >= st->entries) ? 0 : st->current_entry + 1;
-               if(unlikely(!st->first_entry_t)) st->first_entry_t = next_ut;
                last_ut = next_ut;
        }
 
@@ -1229,12 +1222,3 @@ unsigned long long rrdset_done(RRDSET *st)
 
        return(st->usec_since_last_update);
 }
-
-
-// find the oldest entry in the data, skipping all empty slots
-time_t rrdset_first_entry_t(RRDSET *st)
-{
-       if(unlikely(!st->first_entry_t)) return st->last_updated.tv_sec;
-       
-       return st->first_entry_t / 1000000;
-}
index ec4e54649cee665cdff6bda643db6264a215dc2b..c179aafee8ffcb18b4164313d02d288fad6bc285 100755 (executable)
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -21,8 +21,8 @@ extern int rrd_delete_unupdated_dimensions;
 
 #define RRD_ID_LENGTH_MAX 1024
 
-#define RRDSET_MAGIC           "NETDATA RRD SET FILE V012"
-#define RRDDIMENSION_MAGIC     "NETDATA RRD DIMENSION FILE V012"
+#define RRDSET_MAGIC           "NETDATA RRD SET FILE V013"
+#define RRDDIMENSION_MAGIC     "NETDATA RRD DIMENSION FILE V013"
 
 typedef long long total_number;
 #define TOTAL_NUMBER_FORMAT "%lld"
@@ -211,8 +211,6 @@ struct rrdset {
        unsigned long counter;                                                  // the number of times we added values to this rrd
        unsigned long counter_done;                                             // the number of times we added values to this rrd
 
-       unsigned long long first_entry_t;                               // the timestamp (in microseconds) of the oldest entry in the db
-
        uint32_t hash;                                                                  // a simple hash on the id, to speed up searching
                                                                                                        // we first compare hashes, and only if the hashes are equal we do string comparisons
 
@@ -271,8 +269,33 @@ extern void rrdset_next_plugins(RRDSET *st);
 
 extern unsigned long long rrdset_done(RRDSET *st);
 
-extern time_t rrdset_first_entry_t(RRDSET *st);
+// get the total duration in seconds of the round robin database
+#define rrdset_duration(st) ((time_t)( (((st)->counter >= ((unsigned long)(st)->entries))?(unsigned long)(st)->entries:(st)->counter) * (st)->update_every ))
+
+// get the timestamp of the last entry in the round robin database
+#define rrdset_last_entry_t(st) ((time_t)(((st)->last_updated.tv_sec)))
+
+// get the timestamp of first entry in the round robin database
+#define rrdset_first_entry_t(st) ((time_t)(rrdset_last_entry_t(st) - rrdset_duration(st)))
+
+// get the last slot updated in the round robin database
+#define rrdset_last_slot(st) (((st)->current_entry == 0) ? (st)->entries - 1 : (st)->current_entry - 1)
+
+// get the first / oldest slot updated in the round robin database
+#define rrdset_first_slot(st) ((st)->current_entry)
 
+// get the slot of the round robin database, for the given timestamp (t)
+// it always returns a valid slot, although may not be for the time requested if the time is outside the round robin database
+#define rrdset_time2slot(st, t) ( \
+               (  (t) >= rrdset_last_entry_t(st))  ? ( rrdset_last_slot(st) ) : \
+               ( ((t) <= rrdset_first_entry_t(st)) ?   rrdset_first_slot(st) : \
+               ( (rrdset_last_slot(st) >= ((long)(rrdset_last_entry_t(st) - (t)) / (long)((st)->update_every)) ) ? \
+                 (rrdset_last_slot(st) -  ((long)(rrdset_last_entry_t(st) - (t)) / (long)((st)->update_every)) ) : \
+                 (rrdset_last_slot(st) -  ((long)(rrdset_last_entry_t(st) - (t)) / (long)((st)->update_every)) + (st)->entries ) \
+               )))
+
+// get the timestamp of a specific slot in the round robin database
+#define rrdset_slot2time(st, slot) ()
 
 // ----------------------------------------------------------------------------
 // RRD DIMENSION functions
@@ -290,4 +313,7 @@ extern int rrddim_unhide(RRDSET *st, const char *id);
 extern void rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value);
 extern int rrddim_set(RRDSET *st, const char *id, collected_number value);
 
+
+
+
 #endif /* NETDATA_RRD_H */
index 875ea6d9202c3e57b8279f658fc2080c4806bdb7..248375ee36a912596879a1f1eea861820118388c 100755 (executable)
@@ -53,9 +53,9 @@ unsigned long rrd_stats_one_json(RRDSET *st, char *options, struct web_buffer *w
                , st->counter
                , st->entries
                , rrdset_first_entry_t(st)
-               , st->current_entry
-               , st->last_updated.tv_sec
-               , now - (st->last_updated.tv_sec > now) ? now : st->last_updated.tv_sec
+               , rrdset_last_slot(st)
+               , rrdset_last_entry_t(st)
+               , (now < rrdset_last_entry_t(st)) ? (time_t)0 : now - rrdset_last_entry_t(st)
                , st->update_every
                , st->isdetail
                , st->usec_since_last_update
@@ -190,15 +190,10 @@ unsigned long rrd_stats_json(int type, RRDSET *st, struct web_buffer *wb, int en
        if(entries_to_show < 1) entries_to_show = 1;
        if(group < 1) group = 1;
        
-       // make sure current_entry is within limits
-       long current_entry = (long)st->current_entry - (long)1;
-       if(current_entry < 0) current_entry = 0;
-       else if(current_entry >= st->entries) current_entry = st->entries - 1;
-       
        // find the oldest entry of the round-robin
        long max_entries_init = (st->counter < (unsigned long)st->entries) ? st->counter : (unsigned long)st->entries;
        
-       time_t time_init = st->last_updated.tv_sec;
+       time_t time_init = rrdset_last_entry_t(st);
        
        if(before == 0 || before > time_init) before = time_init;
        if(after  == 0) after = rrdset_first_entry_t(st);
@@ -242,8 +237,8 @@ unsigned long rrd_stats_json(int type, RRDSET *st, struct web_buffer *wb, int en
                debug(D_RRD_STATS, "%s first_entry_t = %lu, last_entry_t = %lu, duration = %lu, after = %lu, before = %lu, duration = %lu, entries_to_show = %lu, group = %lu, max_entries = %ld"
                        , st->id
                        , rrdset_first_entry_t(st)
-                       , st->last_updated.tv_sec
-                       , st->last_updated.tv_sec - rrdset_first_entry_t(st)
+                       , rrdset_last_entry_t(st)
+                       , rrdset_last_entry_t(st) - rrdset_first_entry_t(st)
                        , after
                        , before
                        , before - after
@@ -264,19 +259,22 @@ unsigned long rrd_stats_json(int type, RRDSET *st, struct web_buffer *wb, int en
        // temp arrays for keeping values per dimension
        
        calculated_number group_values[dimensions]; // keep sums when grouping
-       calculated_number print_values[dimensions]; // keep the final value to be printed
        int               print_hidden[dimensions]; // keep hidden flags
        int               found_non_zero[dimensions];
        int               found_non_existing[dimensions];
 
        // initialize them
        for( rd = st->dimensions, c = 0 ; rd && c < dimensions ; rd = rd->next, c++) {
-               group_values[c] = print_values[c] = 0;
+               group_values[c] = 0;
                print_hidden[c] = rd->hidden;
                found_non_zero[c] = 0;
                found_non_existing[c] = 0;
        }
 
+
+       // error("OLD: points=%d after=%d before=%d group=%d, duration=%d", entries_to_show, before - (st->update_every * group * entries_to_show), before, group, before - after + 1);
+       // rrd2array(st, entries_to_show, before - (st->update_every * group * entries_to_show), before, group_method, only_non_zero);
+
        // -------------------------------------------------------------------------
        // remove dimensions that contain only zeros
 
@@ -329,7 +327,10 @@ unsigned long rrd_stats_json(int type, RRDSET *st, struct web_buffer *wb, int en
 
                long count = 0, printed = 0, group_count = 0;
                last_timestamp = 0;
-               for(t = current_entry; max_entries ; now -= st->update_every, t--, max_entries--) {
+
+               long expected_to_start_at_slot = rrdset_time2slot(st, before);
+
+               for(t = rrdset_last_slot(st); max_entries ; now -= st->update_every, t--, max_entries--) {
                        if(t < 0) t = st->entries - 1;
 
                        int print_this = 0;
@@ -350,6 +351,11 @@ unsigned long rrd_stats_json(int type, RRDSET *st, struct web_buffer *wb, int en
                        // make sure we return data in the proper time range
                        if(now < after || now > before) continue;
 
+                       if(expected_to_start_at_slot != -999999) {
+                               error("%s: Expected to start on slot %ld, started on %ld, %s", st->id, expected_to_start_at_slot, t, (t == expected_to_start_at_slot)?"OK":"ERROR");
+                               expected_to_start_at_slot = -999999;
+                       }
+
                        count++;
                        group_count++;
 
@@ -488,6 +494,349 @@ unsigned long rrd_stats_json(int type, RRDSET *st, struct web_buffer *wb, int en
 }
 
 
+// ----------------------------------------------------------------------------
+
+// RRDR options
+#define RRDR_EMPTY     0x01
+#define RRDR_RESET     0x02
+#define RRDR_HIDDEN    0x04
+#define RRDR_NONZERO   0x08
+
+
+typedef struct rrdresult {
+       RRDSET *st;                     // the chart this result refers to
+
+       int d;                                  // the number of dimensions
+       int n;                                  // the number of values in the arrays
+
+       time_t *t;                              // array of timestamps
+       calculated_number *v;   // array n x d values
+       uint8_t *o;                             // array n x d options
+
+       int c;                                  // current line (n)
+
+       int has_st_lock;                // if st is read locked by us
+} RRDR;
+
+inline static calculated_number *rrdr_line_values(RRDR *r)
+{
+       return &r->v[ r->c * r->d ];
+}
+
+inline static uint8_t *rrdr_line_options(RRDR *r)
+{
+       return &r->o[ r->c * r->d ];
+}
+
+inline static int rrdr_line_next(RRDR *r, time_t t)
+{
+       // save the time
+       r->t[r->c] = t;
+
+       r->c++;
+       if(unlikely(r->c >= r->n)) {
+               r->c--;
+               return 0;
+       }
+
+       return 1;
+}
+
+inline static void rrdr_lock_rrdset(RRDR *r) {
+       if(unlikely(!r)) {
+               error("NULL value given!");
+               return;
+       }
+
+       pthread_rwlock_rdlock(&r->st->rwlock);
+       r->has_st_lock = 1;
+}
+
+inline static void rrdr_unlock_rrdset(RRDR *r) {
+       if(unlikely(!r)) {
+               error("NULL value given!");
+               return;
+       }
+
+       if(likely(r->has_st_lock)) {
+               pthread_rwlock_unlock(&r->st->rwlock);
+               r->has_st_lock = 0;
+       }
+}
+
+inline static void rrdr_free(RRDR *r)
+{
+       if(unlikely(!r)) {
+               error("NULL value given!");
+               return;
+       }
+
+       rrdr_unlock_rrdset(r);
+       if(likely(r->t)) free(r->t);
+       if(likely(r->v)) free(r->v);
+       if(likely(r->o)) free(r->o);
+       free(r);
+}
+
+static RRDR *rrdr_create(RRDSET *st, int n)
+{
+       if(unlikely(!st)) {
+               error("NULL value given!");
+               return NULL;
+       }
+
+       RRDR *r = calloc(1, sizeof(RRDR));
+       if(unlikely(!r)) goto cleanup;
+
+       r->st = st;
+
+       rrdr_lock_rrdset(r);
+
+       RRDDIM *rd;
+       for(rd = st->dimensions ; rd ; rd = rd->next) r->d++;
+
+       r->n = n;
+       r->t = malloc(sizeof(time_t) * n);
+       if(unlikely(!r->t)) goto cleanup;
+
+       r->t = malloc(sizeof(time_t) * n);
+       if(unlikely(!r->t)) goto cleanup;
+
+       r->v = malloc(sizeof(calculated_number) * n * r->d);
+       if(unlikely(!r->v)) goto cleanup;
+
+       r->o = malloc(sizeof(calculated_number) * n * r->d);
+       if(unlikely(!r->o)) goto cleanup;
+
+       return r;
+
+cleanup:
+       error("Cannot allocate memory");
+       if(likely(r)) rrdr_free(r);
+       return NULL;
+}
+
+RRDR *rrd2rrdr(RRDSET *st, long points, time_t after, time_t before, int group_method)
+{
+       time_t first_entry_t = rrdset_first_entry_t(st);
+       time_t last_entry_t = rrdset_last_entry_t(st);
+
+       // allow relative for before and after
+       if(before <= st->update_every * st->entries) before = last_entry_t + before;
+       if(after <= st->update_every * st->entries) after = last_entry_t + after;
+
+       // make sure they are within our timeframe
+       if(before > last_entry_t) before = last_entry_t;
+       if(before < first_entry_t) before = first_entry_t;
+
+       if(after > last_entry_t) after = last_entry_t;
+       if(after < first_entry_t) after = first_entry_t;
+
+       // check if they are upside down
+       if(after > before) {
+               time_t t = before;
+               before = after;
+               after = t;
+       }
+
+       // the duration of the chart
+       time_t duration = before - after;
+       if(duration <= 0) return NULL;
+
+       // check the required points
+       if(points <= 0) points = duration;
+
+       // calculate proper grouping of source data
+       int group = duration / points;
+       if(group <= 0) group = 1;
+       if(duration / group > points) group++;
+
+       // align timestamps to group
+       before -= before % group;
+       after -= after % group;
+       duration = before - after;
+
+       // error("NEW: points=%d after=%d before=%d group=%d, duration=%d", points, after, before, group, duration);
+
+       // Now we have:
+       // before = the end time of the calculation
+       // after = the start time of the calculation
+       // duration = the duration of the calculation
+       // group = the number of source points to aggregate / group together
+       // method = the method of grouping source points
+       // points = the number of points to generate
+
+
+       // -------------------------------------------------------------------------
+       // initialize our result set
+
+       RRDR *r = rrdr_create(st, points);
+       if(!r) return NULL;
+       if(!r->d) {
+               rrdr_free(r);
+               return NULL;
+       }
+
+       // find how many dimensions we have
+       long dimensions = r->d;
+
+
+       // -------------------------------------------------------------------------
+
+       // how many entries can we use from the source data?
+       long max_entries = (st->counter < (unsigned long)st->entries) ? st->counter : (unsigned long)st->entries;
+
+
+       // -------------------------------------------------------------------------
+       // checks for debugging
+
+       if(st->debug) {
+               debug(D_RRD_STATS, "%s first_entry_t = %lu, last_entry_t = %lu, duration = %lu, after = %lu, before = %lu, duration = %lu, entries_to_show = %lu, group = %lu, max_entries = %ld"
+                       , st->id
+                       , first_entry_t
+                       , last_entry_t
+                       , last_entry_t - first_entry_t
+                       , after
+                       , before
+                       , duration
+                       , points
+                       , group
+                       , max_entries
+                       );
+       }
+
+
+       // -------------------------------------------------------------------------
+       // temp arrays for keeping values per dimension
+
+       calculated_number       group_values[dimensions]; // keep sums when grouping
+       long                            group_counts[dimensions]; // keep the number of values added to group_values
+       uint8_t                         group_options[dimensions];
+       uint8_t                         found_non_zero[dimensions];
+
+
+       // initialize them
+       RRDDIM *rd;
+       long c;
+       for( rd = st->dimensions, c = 0 ; rd && c < dimensions ; rd = rd->next, c++) {
+               group_values[c] = 0;
+               group_counts[c] = 0;
+               group_options[c] = 0;
+               found_non_zero[c] = 0;
+       }
+
+
+       // -------------------------------------------------------------------------
+       // the main loop
+
+       int debug = st->debug;
+
+       time_t  now = last_entry_t,
+                       dt = st->update_every;
+
+       long    t = rrdset_time2slot(st, before), // rrdset_last_slot(st),
+                       count = 0,
+                       added = 0,
+                       group_count = group,
+                       add_this = 0;
+
+       for( ; max_entries ; now -= dt, t--, max_entries--) {
+               if(unlikely(t < 0)) c = st->entries - 1;
+
+               if(unlikely(debug)) debug(D_RRD_STATS, "%s c = %ld, count = %ld, group_count = %ld, added = %ld, now = %lu, %s %s"
+                               , st->id
+                               , t
+                               , count + 1
+                               , group_count + 1
+                               , added
+                               , now
+                               , (group_count == 0)?"PRINT":"  -  "
+                               , (now >= after && now <= before)?"RANGE":"  -  "
+                               );
+
+               // make sure we return data in the proper time range
+               if(unlikely(now < after || now > before)) continue;
+
+               count++;
+               group_count++;
+
+               if(unlikely(group_count == group)) {
+                       if(unlikely(added >= points)) break;
+                       add_this = 1;
+               }
+
+               // do the calculations
+               for(rd = st->dimensions, c = 0 ; likely(rd && c < dimensions) ; rd = rd->next, c++) {
+                       storage_number n = rd->values[t];
+                       if(unlikely(!does_storage_number_exist(n))) continue;
+
+                       group_counts[c]++;
+
+                       calculated_number value = unpack_storage_number(n);
+                       if(value != 0.0) {
+                               group_options[c] |= RRDR_NONZERO;
+                               found_non_zero[c] = 1;
+                       }
+
+                       if(unlikely(did_storage_number_reset(n)))
+                               group_options[c] |= RRDR_RESET;
+
+                       switch(group_method) {
+                               case GROUP_MAX:
+                                       if(unlikely(abs(value) > abs(group_values[c])))
+                                               group_values[c] = value;
+                                       break;
+
+                               default:
+                               case GROUP_SUM:
+                               case GROUP_AVERAGE:
+                                       group_values[c] += value;
+                                       break;
+
+                                       group_values[c] += value;
+                                       break;
+                       }
+               }
+
+               // added it
+               if(unlikely(add_this)) {
+                       calculated_number *cn = rrdr_line_values(r);
+                       uint8_t *co = rrdr_line_options(r);
+
+                       for(rd = st->dimensions, c = 0 ; likely(rd && c < dimensions) ; rd = rd->next, c++) {
+                               if(rd->hidden) group_options[c] |= RRDR_HIDDEN;
+
+                               co[c] = group_options[c];
+
+                               if(group_counts[c] == 0) {
+                                       cn[c] = 0.0;
+                                       co[c] |= RRDR_EMPTY;
+                               }
+                               else if(unlikely(group_method == GROUP_AVERAGE)) {
+                                       cn[c] = group_values[c] / group_counts[c];
+                               }
+                               else {
+                                       cn[c] = group_values[c];
+                               }
+
+                               // reset them for the next loop
+                               group_values[c] = 0;
+                               group_counts[c] = 0;
+                               group_options[c] = 0;
+                       }
+
+                       added++;
+                       group_count = 0;
+                       add_this = 0;
+               }
+
+               rrdr_line_next(r, now);
+       }
+
+       return r;
+}
+
+
 /*
 unsigned long rrdset2json(int type, RRDSET *st, struct web_buffer *wb, int entries_to_show, int group, int group_method, time_t after, time_t before, int only_non_zero) {