]> arthur.barton.de Git - netdata.git/blob - src/rrd.c
5ba5d39a3990fa5f973015998141b3f47e7c8601
[netdata.git] / src / rrd.c
1 #include "common.h"
2
3 #define RRD_DEFAULT_GAP_INTERPOLATIONS 1
4
5 // ----------------------------------------------------------------------------
6 // globals
7
8 // if not zero it gives the time (in seconds) to remove un-updated dimensions
9 // DO NOT ENABLE
10 // if dimensions are removed, the chart generation will have to run again
11 int rrd_delete_unupdated_dimensions = 0;
12
13 int rrd_update_every = UPDATE_EVERY;
14 int rrd_default_history_entries = RRD_DEFAULT_HISTORY_ENTRIES;
15 int rrd_memory_mode = RRD_MEMORY_MODE_SAVE;
16
17 static int rrdset_compare(void* a, void* b);
18 static int rrdset_compare_name(void* a, void* b);
19 static int rrdcontext_compare(void* a, void* b);
20
21 RRDHOST localhost = {
22                 .hostname = "localhost",
23                 .rrdset_root = NULL,
24                 .rrdset_root_rwlock = PTHREAD_RWLOCK_INITIALIZER,
25         .rrdset_root_index = {
26             { NULL, rrdset_compare },
27             AVL_LOCK_INITIALIZER
28         },
29         .rrdset_root_index_name = {
30             { NULL, rrdset_compare_name },
31             AVL_LOCK_INITIALIZER
32         },
33         .rrdcontext_root_index = {
34             { NULL, rrdcontext_compare },
35             AVL_LOCK_INITIALIZER
36         },
37         .variables_root_index = {
38             { NULL, rrdvar_compare },
39             AVL_LOCK_INITIALIZER
40         }
41 };
42
43 // ----------------------------------------------------------------------------
44 // RRDCONTEXT index
45
46 static int rrdcontext_compare(void* a, void* b) {
47     if(((RRDCONTEXT *)a)->hash < ((RRDCONTEXT *)b)->hash) return -1;
48     else if(((RRDCONTEXT *)a)->hash > ((RRDCONTEXT *)b)->hash) return 1;
49     else return strcmp(((RRDCONTEXT *)a)->id, ((RRDCONTEXT *)b)->id);
50 }
51
52 #define rrdcontext_index_add(host, rc) (RRDCONTEXT *)avl_insert_lock(&((host)->rrdcontext_root_index), (avl *)(rc))
53 #define rrdcontext_index_del(host, rc) (RRDCONTEXT *)avl_remove_lock(&((host)->rrdcontext_root_index), (avl *)(rc))
54
55 static RRDCONTEXT *rrdcontext_index_find(RRDHOST *host, const char *id, uint32_t hash) {
56     RRDCONTEXT tmp;
57     tmp.id = id;
58     tmp.hash = (hash)?hash:simple_hash(tmp.id);
59
60     return (RRDCONTEXT *)avl_search_lock(&(host->rrdcontext_root_index), (avl *) &tmp);
61 }
62
63 RRDCONTEXT *rrdcontext_create(const char *id) {
64     RRDCONTEXT *rc = rrdcontext_index_find(&localhost, id, 0);
65     if(!rc) {
66         rc = calloc(1, sizeof(RRDCONTEXT));
67         if(!rc) fatal("Cannot allocate RRDCONTEXT memory");
68
69         rc->id = strdup(id);
70         if(!rc->id) fatal("Cannot allocate RRDCONTEXT.id memory");
71
72         rc->hash = simple_hash(rc->id);
73
74         // initialize the variables index
75         avl_init_lock(&rc->variables_root_index, rrdvar_compare);
76
77         RRDCONTEXT *ret = rrdcontext_index_add(&localhost, rc);
78         if(ret != rc)
79             fatal("INTERNAL ERROR: Expected to INSERT RRDCONTEXT '%s' into index, but inserted '%s'.", rc->id, (ret)?ret->id:"NONE");
80     }
81
82     rc->use_count++;
83     return rc;
84 }
85
86 void rrdcontext_free(RRDCONTEXT *rc) {
87     rc->use_count--;
88     if(!rc->use_count) {
89         RRDCONTEXT *ret = rrdcontext_index_del(&localhost, rc);
90         if(ret != rc)
91             fatal("INTERNAL ERROR: Expected to DELETE RRDCONTEXT '%s' from index, but deleted '%s'.", rc->id, (ret)?ret->id:"NONE");
92
93         if(rc->variables_root_index.avl_tree.root != NULL)
94             fatal("INTERNAL ERROR: Variables index of RRDCONTEXT '%s' that is freed, is not empty.", rc->id);
95
96         free((void *)rc->id);
97         free(rc);
98     }
99 }
100
101 // ----------------------------------------------------------------------------
102 // RRDSET index
103
104 static int rrdset_compare(void* a, void* b) {
105     if(((RRDSET *)a)->hash < ((RRDSET *)b)->hash) return -1;
106     else if(((RRDSET *)a)->hash > ((RRDSET *)b)->hash) return 1;
107     else return strcmp(((RRDSET *)a)->id, ((RRDSET *)b)->id);
108 }
109
110 #define rrdset_index_add(host, st) (RRDSET *)avl_insert_lock(&((host)->rrdset_root_index), (avl *)(st))
111 #define rrdset_index_del(host, st) (RRDSET *)avl_remove_lock(&((host)->rrdset_root_index), (avl *)(st))
112
113 static RRDSET *rrdset_index_find(RRDHOST *host, const char *id, uint32_t hash) {
114     RRDSET tmp;
115     strncpyz(tmp.id, id, RRD_ID_LENGTH_MAX);
116     tmp.hash = (hash)?hash:simple_hash(tmp.id);
117
118     return (RRDSET *)avl_search_lock(&(host->rrdset_root_index), (avl *) &tmp);
119 }
120
121 // ----------------------------------------------------------------------------
122 // RRDSET name index
123
124 #define rrdset_from_avlname(avlname_ptr) ((RRDSET *)((avlname_ptr) - offsetof(RRDSET, avlname)))
125
126 static int rrdset_compare_name(void* a, void* b) {
127         RRDSET *A = rrdset_from_avlname(a);
128         RRDSET *B = rrdset_from_avlname(b);
129
130         // fprintf(stderr, "COMPARING: %s with %s\n", A->name, B->name);
131
132         if(A->hash_name < B->hash_name) return -1;
133         else if(A->hash_name > B->hash_name) return 1;
134         else return strcmp(A->name, B->name);
135 }
136
137 RRDSET *rrdset_index_add_name(RRDHOST *host, RRDSET *st) {
138     void *result;
139         // fprintf(stderr, "ADDING: %s (name: %s)\n", st->id, st->name);
140         result = avl_insert_lock(&host->rrdset_root_index_name, (avl *) (&st->avlname));
141     if(result) return rrdset_from_avlname(result);
142     return NULL;
143 }
144
145 RRDSET *rrdset_index_del_name(RRDHOST *host, RRDSET *st) {
146     void *result;
147     // fprintf(stderr, "DELETING: %s (name: %s)\n", st->id, st->name);
148     return (RRDSET *)avl_remove_lock(&((host)->rrdset_root_index_name), (avl *)(&st->avlname));
149     if(result) return rrdset_from_avlname(result);
150     return NULL;
151 }
152
153 static RRDSET *rrdset_index_find_name(RRDHOST *host, const char *name, uint32_t hash) {
154         void *result = NULL;
155         RRDSET tmp;
156         tmp.name = name;
157         tmp.hash_name = (hash)?hash:simple_hash(tmp.name);
158
159         // fprintf(stderr, "SEARCHING: %s\n", name);
160         result = avl_search_lock(&host->rrdset_root_index_name, (avl *) (&(tmp.avlname)));
161         if(result) {
162                 RRDSET *st = rrdset_from_avlname(result);
163                 if(strcmp(st->magic, RRDSET_MAGIC))
164                         error("Search for RRDSET %s returned an invalid RRDSET %s (name %s)", name, st->id, st->name);
165
166                 // fprintf(stderr, "FOUND: %s\n", name);
167                 return rrdset_from_avlname(result);
168         }
169         // fprintf(stderr, "NOT FOUND: %s\n", name);
170         return NULL;
171 }
172
173
174 // ----------------------------------------------------------------------------
175 // RRDDIM index
176
177 static int rrddim_compare(void* a, void* b) {
178         if(((RRDDIM *)a)->hash < ((RRDDIM *)b)->hash) return -1;
179         else if(((RRDDIM *)a)->hash > ((RRDDIM *)b)->hash) return 1;
180         else return strcmp(((RRDDIM *)a)->id, ((RRDDIM *)b)->id);
181 }
182
183 #define rrddim_index_add(st, rd) avl_insert_lock(&((st)->dimensions_index), (avl *)(rd))
184 #define rrddim_index_del(st,rd ) avl_remove_lock(&((st)->dimensions_index), (avl *)(rd))
185
186 static RRDDIM *rrddim_index_find(RRDSET *st, const char *id, uint32_t hash) {
187         RRDDIM tmp;
188         strncpyz(tmp.id, id, RRD_ID_LENGTH_MAX);
189         tmp.hash = (hash)?hash:simple_hash(tmp.id);
190
191         return (RRDDIM *)avl_search_lock(&(st->dimensions_index), (avl *) &tmp);
192 }
193
194 // ----------------------------------------------------------------------------
195 // chart types
196
197 int rrdset_type_id(const char *name)
198 {
199         if(unlikely(strcmp(name, RRDSET_TYPE_AREA_NAME) == 0)) return RRDSET_TYPE_AREA;
200         else if(unlikely(strcmp(name, RRDSET_TYPE_STACKED_NAME) == 0)) return RRDSET_TYPE_STACKED;
201         else if(unlikely(strcmp(name, RRDSET_TYPE_LINE_NAME) == 0)) return RRDSET_TYPE_LINE;
202         return RRDSET_TYPE_LINE;
203 }
204
205 const char *rrdset_type_name(int chart_type)
206 {
207         static char line[] = RRDSET_TYPE_LINE_NAME;
208         static char area[] = RRDSET_TYPE_AREA_NAME;
209         static char stacked[] = RRDSET_TYPE_STACKED_NAME;
210
211         switch(chart_type) {
212                 case RRDSET_TYPE_LINE:
213                         return line;
214
215                 case RRDSET_TYPE_AREA:
216                         return area;
217
218                 case RRDSET_TYPE_STACKED:
219                         return stacked;
220         }
221         return line;
222 }
223
224 // ----------------------------------------------------------------------------
225 // load / save
226
227 const char *rrd_memory_mode_name(int id)
228 {
229         static const char ram[] = RRD_MEMORY_MODE_RAM_NAME;
230         static const char map[] = RRD_MEMORY_MODE_MAP_NAME;
231         static const char save[] = RRD_MEMORY_MODE_SAVE_NAME;
232
233         switch(id) {
234                 case RRD_MEMORY_MODE_RAM:
235                         return ram;
236
237                 case RRD_MEMORY_MODE_MAP:
238                         return map;
239
240                 case RRD_MEMORY_MODE_SAVE:
241                 default:
242                         return save;
243         }
244
245         return save;
246 }
247
248 int rrd_memory_mode_id(const char *name)
249 {
250         if(unlikely(!strcmp(name, RRD_MEMORY_MODE_RAM_NAME)))
251                 return RRD_MEMORY_MODE_RAM;
252         else if(unlikely(!strcmp(name, RRD_MEMORY_MODE_MAP_NAME)))
253                 return RRD_MEMORY_MODE_MAP;
254
255         return RRD_MEMORY_MODE_SAVE;
256 }
257
258 // ----------------------------------------------------------------------------
259 // algorithms types
260
261 int rrddim_algorithm_id(const char *name)
262 {
263         if(strcmp(name, RRDDIM_INCREMENTAL_NAME) == 0)                  return RRDDIM_INCREMENTAL;
264         if(strcmp(name, RRDDIM_ABSOLUTE_NAME) == 0)                     return RRDDIM_ABSOLUTE;
265         if(strcmp(name, RRDDIM_PCENT_OVER_ROW_TOTAL_NAME) == 0)                 return RRDDIM_PCENT_OVER_ROW_TOTAL;
266         if(strcmp(name, RRDDIM_PCENT_OVER_DIFF_TOTAL_NAME) == 0)        return RRDDIM_PCENT_OVER_DIFF_TOTAL;
267         return RRDDIM_ABSOLUTE;
268 }
269
270 const char *rrddim_algorithm_name(int chart_type)
271 {
272         static char absolute[] = RRDDIM_ABSOLUTE_NAME;
273         static char incremental[] = RRDDIM_INCREMENTAL_NAME;
274         static char percentage_of_absolute_row[] = RRDDIM_PCENT_OVER_ROW_TOTAL_NAME;
275         static char percentage_of_incremental_row[] = RRDDIM_PCENT_OVER_DIFF_TOTAL_NAME;
276
277         switch(chart_type) {
278                 case RRDDIM_ABSOLUTE:
279                         return absolute;
280
281                 case RRDDIM_INCREMENTAL:
282                         return incremental;
283
284                 case RRDDIM_PCENT_OVER_ROW_TOTAL:
285                         return percentage_of_absolute_row;
286
287                 case RRDDIM_PCENT_OVER_DIFF_TOTAL:
288                         return percentage_of_incremental_row;
289         }
290         return absolute;
291 }
292
293 // ----------------------------------------------------------------------------
294 // chart names
295
296 char *rrdset_strncpyz_name(char *to, const char *from, size_t length)
297 {
298         char c, *p = to;
299
300         while (length-- && (c = *from++)) {
301                 if(c != '.' && !isalnum(c))
302                         c = '_';
303
304                 *p++ = c;
305         }
306
307         *p = '\0';
308
309         return to;
310 }
311
312 void rrdset_set_name(RRDSET *st, const char *name)
313 {
314         debug(D_RRD_CALLS, "rrdset_set_name() old: %s, new: %s", st->name, name);
315
316         if(st->name) rrdset_index_del_name(&localhost, st);
317
318         char b[CONFIG_MAX_VALUE + 1];
319         char n[RRD_ID_LENGTH_MAX + 1];
320
321         snprintfz(n, RRD_ID_LENGTH_MAX, "%s.%s", st->type, name);
322         rrdset_strncpyz_name(b, n, CONFIG_MAX_VALUE);
323         st->name = config_get(st->id, "name", b);
324         st->hash_name = simple_hash(st->name);
325
326         rrdset_index_add_name(&localhost, st);
327 }
328
329 // ----------------------------------------------------------------------------
330 // cache directory
331
332 char *rrdset_cache_dir(const char *id)
333 {
334         char *ret = NULL;
335
336         static char *cache_dir = NULL;
337         if(!cache_dir) {
338                 cache_dir = config_get("global", "cache directory", CACHE_DIR);
339                 int r = mkdir(cache_dir, 0755);
340                 if(r != 0 && errno != EEXIST)
341                         error("Cannot create directory '%s'", cache_dir);
342         }
343
344         char b[FILENAME_MAX + 1];
345         char n[FILENAME_MAX + 1];
346         rrdset_strncpyz_name(b, id, FILENAME_MAX);
347
348         snprintfz(n, FILENAME_MAX, "%s/%s", cache_dir, b);
349         ret = config_get(id, "cache directory", n);
350
351         if(rrd_memory_mode == RRD_MEMORY_MODE_MAP || rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
352                 int r = mkdir(ret, 0775);
353                 if(r != 0 && errno != EEXIST)
354                         error("Cannot create directory '%s'", ret);
355         }
356
357         return ret;
358 }
359
360 // ----------------------------------------------------------------------------
361 // core functions
362
363 void rrdset_reset(RRDSET *st)
364 {
365         debug(D_RRD_CALLS, "rrdset_reset() %s", st->name);
366
367         st->last_collected_time.tv_sec = 0;
368         st->last_collected_time.tv_usec = 0;
369         st->last_updated.tv_sec = 0;
370         st->last_updated.tv_usec = 0;
371         st->current_entry = 0;
372         st->counter = 0;
373         st->counter_done = 0;
374
375         RRDDIM *rd;
376         for(rd = st->dimensions; rd ; rd = rd->next) {
377                 rd->last_collected_time.tv_sec = 0;
378                 rd->last_collected_time.tv_usec = 0;
379                 rd->counter = 0;
380                 bzero(rd->values, rd->entries * sizeof(storage_number));
381         }
382 }
383
384 RRDSET *rrdset_create(const char *type, const char *id, const char *name, const char *family, const char *context, const char *title, const char *units, long priority, int update_every, int chart_type)
385 {
386         if(!type || !type[0]) {
387                 fatal("Cannot create rrd stats without a type.");
388                 return NULL;
389         }
390
391         if(!id || !id[0]) {
392                 fatal("Cannot create rrd stats without an id.");
393                 return NULL;
394         }
395
396         char fullid[RRD_ID_LENGTH_MAX + 1];
397         char fullfilename[FILENAME_MAX + 1];
398         RRDSET *st = NULL;
399
400         snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
401
402         st = rrdset_find(fullid);
403         if(st) {
404                 error("Cannot create rrd stats for '%s', it already exists.", fullid);
405                 return st;
406         }
407
408         long entries = config_get_number(fullid, "history", rrd_default_history_entries);
409         if(entries < 5) entries = config_set_number(fullid, "history", 5);
410         if(entries > RRD_HISTORY_ENTRIES_MAX) entries = config_set_number(fullid, "history", RRD_HISTORY_ENTRIES_MAX);
411
412         int enabled = config_get_boolean(fullid, "enabled", 1);
413         if(!enabled) entries = 5;
414
415         unsigned long size = sizeof(RRDSET);
416         char *cache_dir = rrdset_cache_dir(fullid);
417
418         debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id);
419
420         snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir);
421         if(rrd_memory_mode != RRD_MEMORY_MODE_RAM) st = (RRDSET *)mymmap(fullfilename, size, ((rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 0);
422         if(st) {
423                 if(strcmp(st->magic, RRDSET_MAGIC) != 0) {
424                         errno = 0;
425                         info("Initializing file %s.", fullfilename);
426                         bzero(st, size);
427                 }
428                 else if(strcmp(st->id, fullid) != 0) {
429                         errno = 0;
430                         error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid);
431                         // munmap(st, size);
432                         // st = NULL;
433                         bzero(st, size);
434                 }
435                 else if(st->memsize != size || st->entries != entries) {
436                         errno = 0;
437                         error("File %s does not have the desired size. Clearing it.", fullfilename);
438                         bzero(st, size);
439                 }
440                 else if(st->update_every != update_every) {
441                         errno = 0;
442                         error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
443                         bzero(st, size);
444                 }
445                 else if((time(NULL) - st->last_updated.tv_sec) > update_every * entries) {
446                         errno = 0;
447                         error("File %s is too old. Clearing it.", fullfilename);
448                         bzero(st, size);
449                 }
450         }
451
452         if(st) {
453                 st->name = NULL;
454                 st->type = NULL;
455                 st->family = NULL;
456                 st->context = NULL;
457                 st->title = NULL;
458                 st->units = NULL;
459                 st->dimensions = NULL;
460                 st->next = NULL;
461                 st->mapped = rrd_memory_mode;
462         }
463         else {
464                 st = calloc(1, size);
465                 if(!st) {
466                         fatal("Cannot allocate memory for RRD_STATS %s.%s", type, id);
467                         return NULL;
468                 }
469                 st->mapped = RRD_MEMORY_MODE_RAM;
470         }
471         st->memsize = size;
472         st->entries = entries;
473         st->update_every = update_every;
474
475         if(st->current_entry >= st->entries) st->current_entry = 0;
476
477         strcpy(st->cache_filename, fullfilename);
478         strcpy(st->magic, RRDSET_MAGIC);
479
480         strcpy(st->id, fullid);
481         st->hash = simple_hash(st->id);
482
483         st->cache_dir = cache_dir;
484
485         st->chart_type = rrdset_type_id(config_get(st->id, "chart type", rrdset_type_name(chart_type)));
486         st->type       = config_get(st->id, "type", type);
487         st->family     = config_get(st->id, "family", family?family:st->type);
488         st->context    = config_get(st->id, "context", context?context:st->id);
489         st->units      = config_get(st->id, "units", units?units:"");
490
491         st->priority = config_get_number(st->id, "priority", priority);
492         st->enabled = enabled;
493
494         st->isdetail = 0;
495         st->debug = 0;
496
497         st->last_collected_time.tv_sec = 0;
498         st->last_collected_time.tv_usec = 0;
499         st->counter_done = 0;
500
501         st->gap_when_lost_iterations_above = (int) (
502                         config_get_number(st->id, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS) + 2);
503
504         avl_init_lock(&st->dimensions_index, rrddim_compare);
505     avl_init_lock(&st->variables_root_index, rrdvar_compare);
506
507         pthread_rwlock_init(&st->rwlock, NULL);
508         pthread_rwlock_wrlock(&localhost.rrdset_root_rwlock);
509
510         if(name && *name) rrdset_set_name(st, name);
511         else rrdset_set_name(st, id);
512
513         {
514                 char varvalue[CONFIG_MAX_VALUE + 1];
515                 snprintfz(varvalue, CONFIG_MAX_VALUE, "%s (%s)", title?title:"", st->name);
516                 st->title = config_get(st->id, "title", varvalue);
517         }
518
519         st->next = localhost.rrdset_root;
520     localhost.rrdset_root = st;
521
522         rrdset_index_add(&localhost, st);
523
524     st->rrdcontext = rrdcontext_create(st->context);
525
526         pthread_rwlock_unlock(&localhost.rrdset_root_rwlock);
527
528         return(st);
529 }
530
531 RRDDIM *rrddim_add(RRDSET *st, const char *id, const char *name, long multiplier, long divisor, int algorithm)
532 {
533         char filename[FILENAME_MAX + 1];
534         char fullfilename[FILENAME_MAX + 1];
535
536         char varname[CONFIG_MAX_NAME + 1];
537         RRDDIM *rd = NULL;
538         unsigned long size = sizeof(RRDDIM) + (st->entries * sizeof(storage_number));
539
540         debug(D_RRD_CALLS, "Adding dimension '%s/%s'.", st->id, id);
541
542         rrdset_strncpyz_name(filename, id, FILENAME_MAX);
543         snprintfz(fullfilename, FILENAME_MAX, "%s/%s.db", st->cache_dir, filename);
544         if(rrd_memory_mode != RRD_MEMORY_MODE_RAM) rd = (RRDDIM *)mymmap(fullfilename, size, ((rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 1);
545         if(rd) {
546                 struct timeval now;
547                 gettimeofday(&now, NULL);
548
549                 if(strcmp(rd->magic, RRDDIMENSION_MAGIC) != 0) {
550                         errno = 0;
551                         info("Initializing file %s.", fullfilename);
552                         bzero(rd, size);
553                 }
554                 else if(rd->memsize != size) {
555                         errno = 0;
556                         error("File %s does not have the desired size. Clearing it.", fullfilename);
557                         bzero(rd, size);
558                 }
559                 else if(rd->multiplier != multiplier) {
560                         errno = 0;
561                         error("File %s does not have the same multiplier. Clearing it.", fullfilename);
562                         bzero(rd, size);
563                 }
564                 else if(rd->divisor != divisor) {
565                         errno = 0;
566                         error("File %s does not have the same divisor. Clearing it.", fullfilename);
567                         bzero(rd, size);
568                 }
569                 else if(rd->algorithm != algorithm) {
570                         errno = 0;
571                         error("File %s does not have the same algorithm. Clearing it.", fullfilename);
572                         bzero(rd, size);
573                 }
574                 else if(rd->update_every != st->update_every) {
575                         errno = 0;
576                         error("File %s does not have the same refresh frequency. Clearing it.", fullfilename);
577                         bzero(rd, size);
578                 }
579                 else if(usecdiff(&now, &rd->last_collected_time) > (rd->entries * rd->update_every * 1000000ULL)) {
580                         errno = 0;
581                         error("File %s is too old. Clearing it.", fullfilename);
582                         bzero(rd, size);
583                 }
584                 else if(strcmp(rd->id, id) != 0) {
585                         errno = 0;
586                         error("File %s contents are not for dimension %s. Clearing it.", fullfilename, id);
587                         // munmap(rd, size);
588                         // rd = NULL;
589                         bzero(rd, size);
590                 }
591         }
592
593         if(rd) {
594                 // we have a file mapped for rd
595                 rd->mapped = rrd_memory_mode;
596                 rd->flags = 0x00000000;
597                 rd->next = NULL;
598                 rd->name = NULL;
599         }
600         else {
601                 // if we didn't manage to get a mmap'd dimension, just create one
602
603                 rd = calloc(1, size);
604                 if(!rd) {
605                         fatal("Cannot allocate RRD_DIMENSION %s/%s.", st->id, id);
606                         return NULL;
607                 }
608
609                 rd->mapped = RRD_MEMORY_MODE_RAM;
610         }
611         rd->memsize = size;
612
613         strcpy(rd->magic, RRDDIMENSION_MAGIC);
614         strcpy(rd->cache_filename, fullfilename);
615         strncpyz(rd->id, id, RRD_ID_LENGTH_MAX);
616         rd->hash = simple_hash(rd->id);
617
618         snprintfz(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
619         rd->name = config_get(st->id, varname, (name && *name)?name:rd->id);
620
621         snprintfz(varname, CONFIG_MAX_NAME, "dim %s algorithm", rd->id);
622         rd->algorithm = rrddim_algorithm_id(config_get(st->id, varname, rrddim_algorithm_name(algorithm)));
623
624         snprintfz(varname, CONFIG_MAX_NAME, "dim %s multiplier", rd->id);
625         rd->multiplier = config_get_number(st->id, varname, multiplier);
626
627         snprintfz(varname, CONFIG_MAX_NAME, "dim %s divisor", rd->id);
628         rd->divisor = config_get_number(st->id, varname, divisor);
629         if(!rd->divisor) rd->divisor = 1;
630
631         rd->entries = st->entries;
632         rd->update_every = st->update_every;
633
634         // prevent incremental calculation spikes
635         rd->counter = 0;
636         rd->updated = 0;
637         rd->calculated_value = 0;
638         rd->last_calculated_value = 0;
639         rd->collected_value = 0;
640         rd->last_collected_value = 0;
641         rd->collected_volume = 0;
642         rd->stored_volume = 0;
643         rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
644         rd->last_collected_time.tv_sec = 0;
645         rd->last_collected_time.tv_usec = 0;
646
647         // append this dimension
648         pthread_rwlock_wrlock(&st->rwlock);
649         if(!st->dimensions)
650                 st->dimensions = rd;
651         else {
652                 RRDDIM *td = st->dimensions;
653                 for(; td->next; td = td->next) ;
654                 td->next = rd;
655         }
656         pthread_rwlock_unlock(&st->rwlock);
657
658         rrddim_index_add(st, rd);
659
660         return(rd);
661 }
662
663 void rrddim_set_name(RRDSET *st, RRDDIM *rd, const char *name)
664 {
665         debug(D_RRD_CALLS, "rrddim_set_name() %s.%s", st->name, rd->name);
666
667         char varname[CONFIG_MAX_NAME + 1];
668         snprintfz(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
669         config_set_default(st->id, varname, name);
670 }
671
672 void rrddim_free(RRDSET *st, RRDDIM *rd)
673 {
674         debug(D_RRD_CALLS, "rrddim_free() %s.%s", st->name, rd->name);
675
676         RRDDIM *i, *last = NULL;
677         for(i = st->dimensions; i && i != rd ; i = i->next) last = i;
678
679         if(!i) {
680                 error("Request to free dimension %s.%s but it is not linked.", st->id, rd->name);
681                 return;
682         }
683
684         if(last) last->next = rd->next;
685         else st->dimensions = rd->next;
686         rd->next = NULL;
687
688         rrddim_index_del(st, rd);
689
690         // free(rd->annotations);
691         if(rd->mapped == RRD_MEMORY_MODE_SAVE) {
692                 debug(D_RRD_CALLS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
693                 savememory(rd->cache_filename, rd, rd->memsize);
694
695                 debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
696                 munmap(rd, rd->memsize);
697         }
698         else if(rd->mapped == RRD_MEMORY_MODE_MAP) {
699                 debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
700                 munmap(rd, rd->memsize);
701         }
702         else {
703                 debug(D_RRD_CALLS, "Removing dimension '%s'.", rd->name);
704                 free(rd);
705         }
706 }
707
708 void rrdset_free_all(void)
709 {
710         info("Freeing all memory...");
711
712         RRDSET *st;
713         for(st = localhost.rrdset_root; st ;) {
714                 RRDSET *next = st->next;
715
716                 while(st->dimensions)
717                         rrddim_free(st, st->dimensions);
718
719                 rrdset_index_del(&localhost, st);
720
721         st->rrdcontext->use_count--;
722         if(!st->rrdcontext->use_count)
723             rrdcontext_free(st->rrdcontext);
724
725                 if(st->mapped == RRD_MEMORY_MODE_SAVE) {
726                         debug(D_RRD_CALLS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
727                         savememory(st->cache_filename, st, st->memsize);
728
729                         debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
730                         munmap(st, st->memsize);
731                 }
732                 else if(st->mapped == RRD_MEMORY_MODE_MAP) {
733                         debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
734                         munmap(st, st->memsize);
735                 }
736                 else
737                         free(st);
738
739                 st = next;
740         }
741     localhost.rrdset_root = NULL;
742
743         info("Memory cleanup completed...");
744 }
745
746 void rrdset_save_all(void) {
747         info("Saving database...");
748
749         RRDSET *st;
750         RRDDIM *rd;
751
752         pthread_rwlock_wrlock(&localhost.rrdset_root_rwlock);
753         for(st = localhost.rrdset_root; st ; st = st->next) {
754                 pthread_rwlock_wrlock(&st->rwlock);
755
756                 if(st->mapped == RRD_MEMORY_MODE_SAVE) {
757                         debug(D_RRD_CALLS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
758                         savememory(st->cache_filename, st, st->memsize);
759                 }
760
761                 for(rd = st->dimensions; rd ; rd = rd->next) {
762                         if(likely(rd->mapped == RRD_MEMORY_MODE_SAVE)) {
763                                 debug(D_RRD_CALLS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
764                                 savememory(rd->cache_filename, rd, rd->memsize);
765                         }
766                 }
767
768                 pthread_rwlock_unlock(&st->rwlock);
769         }
770         pthread_rwlock_unlock(&localhost.rrdset_root_rwlock);
771 }
772
773
774 RRDSET *rrdset_find(const char *id)
775 {
776         debug(D_RRD_CALLS, "rrdset_find() for chart %s", id);
777
778         RRDSET *st = rrdset_index_find(&localhost, id, 0);
779         return(st);
780 }
781
782 RRDSET *rrdset_find_bytype(const char *type, const char *id)
783 {
784         debug(D_RRD_CALLS, "rrdset_find_bytype() for chart %s.%s", type, id);
785
786         char buf[RRD_ID_LENGTH_MAX + 1];
787
788         strncpyz(buf, type, RRD_ID_LENGTH_MAX - 1);
789         strcat(buf, ".");
790         int len = (int) strlen(buf);
791         strncpyz(&buf[len], id, (size_t) (RRD_ID_LENGTH_MAX - len));
792
793         return(rrdset_find(buf));
794 }
795
796 RRDSET *rrdset_find_byname(const char *name)
797 {
798         debug(D_RRD_CALLS, "rrdset_find_byname() for chart %s", name);
799
800         RRDSET *st = rrdset_index_find_name(&localhost, name, 0);
801         return(st);
802 }
803
804 RRDDIM *rrddim_find(RRDSET *st, const char *id)
805 {
806         debug(D_RRD_CALLS, "rrddim_find() for chart %s, dimension %s", st->name, id);
807
808         return rrddim_index_find(st, id, 0);
809 }
810
811 int rrddim_hide(RRDSET *st, const char *id)
812 {
813         debug(D_RRD_CALLS, "rrddim_hide() for chart %s, dimension %s", st->name, id);
814
815         RRDDIM *rd = rrddim_find(st, id);
816         if(unlikely(!rd)) {
817                 error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
818                 return 1;
819         }
820
821         rd->flags |= RRDDIM_FLAG_HIDDEN;
822         return 0;
823 }
824
825 int rrddim_unhide(RRDSET *st, const char *id)
826 {
827         debug(D_RRD_CALLS, "rrddim_unhide() for chart %s, dimension %s", st->name, id);
828
829         RRDDIM *rd = rrddim_find(st, id);
830         if(unlikely(!rd)) {
831                 error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
832                 return 1;
833         }
834
835         if(rd->flags & RRDDIM_FLAG_HIDDEN) rd->flags ^= RRDDIM_FLAG_HIDDEN;
836         return 0;
837 }
838
839 collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value)
840 {
841         debug(D_RRD_CALLS, "rrddim_set_by_pointer() for chart %s, dimension %s, value " COLLECTED_NUMBER_FORMAT, st->name, rd->name, value);
842
843         gettimeofday(&rd->last_collected_time, NULL);
844         rd->collected_value = value;
845         rd->updated = 1;
846         rd->counter++;
847
848         return rd->last_collected_value;
849 }
850
851 collected_number rrddim_set(RRDSET *st, const char *id, collected_number value)
852 {
853         RRDDIM *rd = rrddim_find(st, id);
854         if(unlikely(!rd)) {
855                 error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
856                 return 0;
857         }
858
859         return rrddim_set_by_pointer(st, rd, value);
860 }
861
862 void rrdset_next_usec(RRDSET *st, unsigned long long microseconds)
863 {
864         if(!microseconds) rrdset_next(st);
865         else {
866                 debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds);
867
868                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: NEXT: %llu microseconds", st->name, microseconds);
869                 st->usec_since_last_update = microseconds;
870         }
871 }
872
873 void rrdset_next(RRDSET *st)
874 {
875         unsigned long long microseconds = 0;
876
877         if(likely(st->last_collected_time.tv_sec)) {
878                 struct timeval now;
879                 gettimeofday(&now, NULL);
880                 microseconds = usecdiff(&now, &st->last_collected_time);
881         }
882         // prevent infinite loop
883         else microseconds = st->update_every * 1000000ULL;
884
885         rrdset_next_usec(st, microseconds);
886 }
887
888 void rrdset_next_plugins(RRDSET *st)
889 {
890         rrdset_next(st);
891 }
892
893 unsigned long long rrdset_done(RRDSET *st)
894 {
895         if(unlikely(netdata_exit)) return 0;
896
897         debug(D_RRD_CALLS, "rrdset_done() for chart %s", st->name);
898
899         RRDDIM *rd, *last;
900         int oldstate, store_this_entry = 1, first_entry = 0;
901         unsigned long long last_ut, now_ut, next_ut, stored_entries = 0;
902
903         if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) != 0))
904                 error("Cannot set pthread cancel state to DISABLE.");
905
906         // a read lock is OK here
907         pthread_rwlock_rdlock(&st->rwlock);
908
909         // enable the chart, if it was disabled
910         if(unlikely(rrd_delete_unupdated_dimensions) && !st->enabled)
911                 st->enabled = 1;
912
913         // check if the chart has a long time to be updated
914         if(unlikely(st->usec_since_last_update > st->entries * st->update_every * 1000000ULL)) {
915                 info("%s: took too long to be updated (%0.3Lf secs). Reseting it.", st->name, (long double)(st->usec_since_last_update / 1000000.0));
916                 rrdset_reset(st);
917                 st->usec_since_last_update = st->update_every * 1000000ULL;
918                 first_entry = 1;
919         }
920         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: microseconds since last update: %llu", st->name, st->usec_since_last_update);
921
922         // set last_collected_time
923         if(unlikely(!st->last_collected_time.tv_sec)) {
924                 // it is the first entry
925                 // set the last_collected_time to now
926                 gettimeofday(&st->last_collected_time, NULL);
927
928                 // the first entry should not be stored
929                 store_this_entry = 0;
930                 first_entry = 1;
931
932                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: has not set last_collected_time. Setting it now. Will not store the next entry.", st->name);
933         }
934         else {
935                 // it is not the first entry
936                 // calculate the proper last_collected_time, using usec_since_last_update
937                 unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec + st->usec_since_last_update;
938                 st->last_collected_time.tv_sec = (time_t) (ut / 1000000ULL);
939                 st->last_collected_time.tv_usec = (suseconds_t) (ut % 1000000ULL);
940         }
941
942         // if this set has not been updated in the past
943         // we fake the last_update time to be = now - usec_since_last_update
944         if(unlikely(!st->last_updated.tv_sec)) {
945                 // it has never been updated before
946                 // set a fake last_updated, in the past using usec_since_last_update
947                 unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - st->usec_since_last_update;
948                 st->last_updated.tv_sec = (time_t) (ut / 1000000ULL);
949                 st->last_updated.tv_usec = (suseconds_t) (ut % 1000000ULL);
950
951                 // the first entry should not be stored
952                 store_this_entry = 0;
953                 first_entry = 1;
954
955                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
956         }
957
958         // check if we will re-write the entire data set
959         if(unlikely(usecdiff(&st->last_collected_time, &st->last_updated) > st->update_every * st->entries * 1000000ULL)) {
960                 info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Reseting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
961                 rrdset_reset(st);
962
963                 st->usec_since_last_update = st->update_every * 1000000ULL;
964
965                 gettimeofday(&st->last_collected_time, NULL);
966
967                 unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - st->usec_since_last_update;
968                 st->last_updated.tv_sec = (time_t) (ut / 1000000ULL);
969                 st->last_updated.tv_usec = (suseconds_t) (ut % 1000000ULL);
970
971                 // the first entry should not be stored
972                 store_this_entry = 0;
973                 first_entry = 1;
974         }
975
976         // these are the 3 variables that will help us in interpolation
977         // last_ut = the last time we added a value to the storage
978         //  now_ut = the time the current value is taken at
979         // next_ut = the time of the next interpolation point
980         last_ut = st->last_updated.tv_sec * 1000000ULL + st->last_updated.tv_usec;
981         now_ut  = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec;
982         next_ut = (st->last_updated.tv_sec + st->update_every) * 1000000ULL;
983
984         if(unlikely(!first_entry && now_ut < next_ut)) {
985                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name);
986         }
987
988         if(unlikely(st->debug)) {
989                 debug(D_RRD_STATS, "%s: last ut = %0.3Lf (last updated time)", st->name, (long double)last_ut/1000000.0);
990                 debug(D_RRD_STATS, "%s: now  ut = %0.3Lf (current update time)", st->name, (long double)now_ut/1000000.0);
991                 debug(D_RRD_STATS, "%s: next ut = %0.3Lf (next interpolation point)", st->name, (long double)next_ut/1000000.0);
992         }
993
994         if(unlikely(!st->counter_done)) {
995                 store_this_entry = 0;
996                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: Will not store the next entry.", st->name);
997         }
998         st->counter_done++;
999
1000         // calculate totals and count the dimensions
1001         int dimensions;
1002         st->collected_total = 0;
1003         for( rd = st->dimensions, dimensions = 0 ; likely(rd) ; rd = rd->next, dimensions++ )
1004                 if(likely(rd->updated)) st->collected_total += rd->collected_value;
1005
1006         uint32_t storage_flags = SN_EXISTS;
1007
1008         // process all dimensions to calculate their values
1009         // based on the collected figures only
1010         // at this stage we do not interpolate anything
1011         for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
1012
1013                 if(unlikely(!rd->updated)) {
1014                         rd->calculated_value = 0;
1015                         continue;
1016                 }
1017
1018                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: START "
1019                         " last_collected_value = " COLLECTED_NUMBER_FORMAT
1020                         " collected_value = " COLLECTED_NUMBER_FORMAT
1021                         " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1022                         " calculated_value = " CALCULATED_NUMBER_FORMAT
1023                         , st->id, rd->name
1024                         , rd->last_collected_value
1025                         , rd->collected_value
1026                         , rd->last_calculated_value
1027                         , rd->calculated_value
1028                         );
1029
1030                 switch(rd->algorithm) {
1031                         case RRDDIM_ABSOLUTE:
1032                                 rd->calculated_value = (calculated_number)rd->collected_value
1033                                         * (calculated_number)rd->multiplier
1034                                         / (calculated_number)rd->divisor;
1035
1036                                 if(unlikely(st->debug))
1037                                         debug(D_RRD_STATS, "%s/%s: CALC ABS/ABS-NO-IN "
1038                                                 CALCULATED_NUMBER_FORMAT " = "
1039                                                 COLLECTED_NUMBER_FORMAT
1040                                                 " * " CALCULATED_NUMBER_FORMAT
1041                                                 " / " CALCULATED_NUMBER_FORMAT
1042                                                 , st->id, rd->name
1043                                                 , rd->calculated_value
1044                                                 , rd->collected_value
1045                                                 , (calculated_number)rd->multiplier
1046                                                 , (calculated_number)rd->divisor
1047                                                 );
1048                                 break;
1049
1050                         case RRDDIM_PCENT_OVER_ROW_TOTAL:
1051                                 if(unlikely(!st->collected_total))
1052                                         rd->calculated_value = 0;
1053                                 else
1054                                         // the percentage of the current value
1055                                         // over the total of all dimensions
1056                                         rd->calculated_value =
1057                                               (calculated_number)100
1058                                             * (calculated_number)rd->collected_value
1059                                             / (calculated_number)st->collected_total;
1060
1061                                 if(unlikely(st->debug))
1062                                         debug(D_RRD_STATS, "%s/%s: CALC PCENT-ROW "
1063                                                 CALCULATED_NUMBER_FORMAT " = 100"
1064                                                 " * " COLLECTED_NUMBER_FORMAT
1065                                                 " / " COLLECTED_NUMBER_FORMAT
1066                                                 , st->id, rd->name
1067                                                 , rd->calculated_value
1068                                                 , rd->collected_value
1069                                                 , st->collected_total
1070                                                 );
1071                                 break;
1072
1073                         case RRDDIM_INCREMENTAL:
1074                                 if(unlikely(rd->counter <= 1)) {
1075                                         rd->calculated_value = 0;
1076                                         continue;
1077                                 }
1078
1079                                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
1080                                 // to reset the calculation (it will give zero as the calculation for this second)
1081                                 if(unlikely(rd->last_collected_value > rd->collected_value)) {
1082                                         debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
1083                                                         , st->name, rd->name
1084                                                         , rd->last_collected_value
1085                                                         , rd->collected_value);
1086                                         if(!(rd->flags & RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)) storage_flags = SN_EXISTS_RESET;
1087                                         rd->last_collected_value = rd->collected_value;
1088                                 }
1089
1090                                 rd->calculated_value =
1091                                       (calculated_number)(rd->collected_value - rd->last_collected_value)
1092                                     * (calculated_number)rd->multiplier
1093                                     / (calculated_number)rd->divisor;
1094
1095                                 if(unlikely(st->debug))
1096                                         debug(D_RRD_STATS, "%s/%s: CALC INC PRE "
1097                                                 CALCULATED_NUMBER_FORMAT " = ("
1098                                                 COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT
1099                                                 ")"
1100                                                 " * " CALCULATED_NUMBER_FORMAT
1101                                                 " / " CALCULATED_NUMBER_FORMAT
1102                                                 , st->id, rd->name
1103                                                 , rd->calculated_value
1104                                                 , rd->collected_value, rd->last_collected_value
1105                                                 , (calculated_number)rd->multiplier
1106                                                 , (calculated_number)rd->divisor
1107                                                 );
1108                                 break;
1109
1110                         case RRDDIM_PCENT_OVER_DIFF_TOTAL:
1111                                 if(unlikely(rd->counter <= 1)) {
1112                                         rd->calculated_value = 0;
1113                                         continue;
1114                                 }
1115
1116                                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
1117                                 // to reset the calculation (it will give zero as the calculation for this second)
1118                                 if(unlikely(rd->last_collected_value > rd->collected_value)) {
1119                                         debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
1120                                         , st->name, rd->name
1121                                         , rd->last_collected_value
1122                                         , rd->collected_value);
1123                                         if(!(rd->flags & RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)) storage_flags = SN_EXISTS_RESET;
1124                                         rd->last_collected_value = rd->collected_value;
1125                                 }
1126
1127                                 // the percentage of the current increment
1128                                 // over the increment of all dimensions together
1129                                 if(unlikely(st->collected_total == st->last_collected_total))
1130                                         rd->calculated_value = 0;
1131                                 else
1132                                         rd->calculated_value =
1133                                                   (calculated_number)100
1134                                                 * (calculated_number)(rd->collected_value - rd->last_collected_value)
1135                                                 / (calculated_number)(st->collected_total - st->last_collected_total);
1136
1137                                 if(unlikely(st->debug))
1138                                         debug(D_RRD_STATS, "%s/%s: CALC PCENT-DIFF "
1139                                                 CALCULATED_NUMBER_FORMAT " = 100"
1140                                                 " * (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
1141                                                 " / (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
1142                                                 , st->id, rd->name
1143                                                 , rd->calculated_value
1144                                                 , rd->collected_value, rd->last_collected_value
1145                                                 , st->collected_total, st->last_collected_total
1146                                                 );
1147                                 break;
1148
1149                         default:
1150                                 // make the default zero, to make sure
1151                                 // it gets noticed when we add new types
1152                                 rd->calculated_value = 0;
1153
1154                                 if(unlikely(st->debug))
1155                                         debug(D_RRD_STATS, "%s/%s: CALC "
1156                                                 CALCULATED_NUMBER_FORMAT " = 0"
1157                                                 , st->id, rd->name
1158                                                 , rd->calculated_value
1159                                                 );
1160                                 break;
1161                 }
1162
1163                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: PHASE2 "
1164                         " last_collected_value = " COLLECTED_NUMBER_FORMAT
1165                         " collected_value = " COLLECTED_NUMBER_FORMAT
1166                         " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1167                         " calculated_value = " CALCULATED_NUMBER_FORMAT
1168                         , st->id, rd->name
1169                         , rd->last_collected_value
1170                         , rd->collected_value
1171                         , rd->last_calculated_value
1172                         , rd->calculated_value
1173                         );
1174
1175         }
1176
1177         // at this point we have all the calculated values ready
1178         // it is now time to interpolate values on a second boundary
1179
1180         unsigned long long first_ut = last_ut;
1181         long long iterations = (now_ut - last_ut) / (st->update_every * 1000000ULL);
1182         if((now_ut % (st->update_every * 1000000ULL)) == 0) iterations++;
1183
1184         for( ; likely(next_ut <= now_ut) ; next_ut += st->update_every * 1000000ULL, iterations-- ) {
1185 #ifdef NETDATA_INTERNAL_CHECKS
1186                 if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_ut = %llu, next_ut = %llu, now_ut = %llu", st->name, first_ut, last_ut, next_ut, now_ut); }
1187 #endif
1188
1189                 if(unlikely(st->debug)) {
1190                         debug(D_RRD_STATS, "%s: last ut = %0.3Lf (last updated time)", st->name, (long double)last_ut/1000000.0);
1191                         debug(D_RRD_STATS, "%s: next ut = %0.3Lf (next interpolation point)", st->name, (long double)next_ut/1000000.0);
1192                 }
1193
1194                 st->last_updated.tv_sec = (time_t) (next_ut / 1000000ULL);
1195                 st->last_updated.tv_usec = 0;
1196
1197                 for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
1198                         calculated_number new_value;
1199
1200                         switch(rd->algorithm) {
1201                                 case RRDDIM_INCREMENTAL:
1202                                         new_value = (calculated_number)
1203                                                 (          rd->calculated_value
1204                                                         * (calculated_number)(next_ut - last_ut)
1205                                                         / (calculated_number)(now_ut - last_ut)
1206                                                 );
1207
1208                                         if(unlikely(st->debug))
1209                                                 debug(D_RRD_STATS, "%s/%s: CALC2 INC "
1210                                                         CALCULATED_NUMBER_FORMAT " = "
1211                                                         CALCULATED_NUMBER_FORMAT
1212                                                         " * %llu"
1213                                                         " / %llu"
1214                                                         , st->id, rd->name
1215                                                         , new_value
1216                                                         , rd->calculated_value
1217                                                         , (next_ut - last_ut)
1218                                                         , (now_ut - last_ut)
1219                                                         );
1220
1221                                         rd->calculated_value -= new_value;
1222                                         new_value += rd->last_calculated_value;
1223                                         rd->last_calculated_value = 0;
1224                                         new_value /= (calculated_number)st->update_every;
1225                                         break;
1226
1227                                 case RRDDIM_ABSOLUTE:
1228                                 case RRDDIM_PCENT_OVER_ROW_TOTAL:
1229                                 case RRDDIM_PCENT_OVER_DIFF_TOTAL:
1230                                 default:
1231                                         if(iterations == 1) {
1232                                                 // this is the last iteration
1233                                                 // do not interpolate
1234                                                 // just show the calculated value
1235
1236                                                 new_value = rd->calculated_value;
1237                                         }
1238                                         else {
1239                                                 // we have missed an update
1240                                                 // interpolate in the middle values
1241
1242                                                 new_value = (calculated_number)
1243                                                         (       (         (rd->calculated_value - rd->last_calculated_value)
1244                                                                         * (calculated_number)(next_ut - first_ut)
1245                                                                         / (calculated_number)(now_ut - first_ut)
1246                                                                 )
1247                                                                 +  rd->last_calculated_value
1248                                                         );
1249
1250                                                 if(unlikely(st->debug))
1251                                                         debug(D_RRD_STATS, "%s/%s: CALC2 DEF "
1252                                                                 CALCULATED_NUMBER_FORMAT " = ((("
1253                                                                 "(" CALCULATED_NUMBER_FORMAT " - " CALCULATED_NUMBER_FORMAT ")"
1254                                                                 " * %llu"
1255                                                                 " / %llu) + " CALCULATED_NUMBER_FORMAT
1256                                                                 , st->id, rd->name
1257                                                                 , new_value
1258                                                                 , rd->calculated_value, rd->last_calculated_value
1259                                                                 , (next_ut - first_ut)
1260                                                                 , (now_ut - first_ut), rd->last_calculated_value
1261                                                                 );
1262
1263                                                 // this is wrong
1264                                                 // it fades the value towards the target
1265                                                 // while we know the calculated value is different
1266                                                 // if(likely(next_ut + st->update_every * 1000000ULL > now_ut)) rd->calculated_value = new_value;
1267                                         }
1268                                         break;
1269                         }
1270
1271                         if(unlikely(!store_this_entry)) {
1272                                 store_this_entry = 1;
1273                                 continue;
1274                         }
1275
1276                         if(likely(rd->updated && rd->counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
1277                                 rd->values[st->current_entry] = pack_storage_number(new_value, storage_flags );
1278
1279                                 if(unlikely(st->debug))
1280                                         debug(D_RRD_STATS, "%s/%s: STORE[%ld] "
1281                                                 CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT
1282                                                 , st->id, rd->name
1283                                                 , st->current_entry
1284                                                 , unpack_storage_number(rd->values[st->current_entry]), new_value
1285                                                 );
1286                         }
1287                         else {
1288                                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: STORE[%ld] = NON EXISTING "
1289                                                 , st->id, rd->name
1290                                                 , st->current_entry
1291                                                 );
1292                                 rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
1293                         }
1294
1295                         stored_entries++;
1296
1297                         if(unlikely(st->debug)) {
1298                                 calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor;
1299                                 calculated_number t2 = unpack_storage_number(rd->values[st->current_entry]);
1300                                 calculated_number accuracy = accuracy_loss(t1, t2);
1301                                 debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " FLAGS=0x%08x (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)"
1302                                                 , st->id, rd->name
1303                                                 , st->current_entry
1304                                                 , t2
1305                                                 , get_storage_number_flags(rd->values[st->current_entry])
1306                                                 , t1
1307                                                 , accuracy
1308                                                 , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1309                                                 );
1310
1311                                 rd->collected_volume += t1;
1312                                 rd->stored_volume += t2;
1313                                 accuracy = accuracy_loss(rd->collected_volume, rd->stored_volume);
1314                                 debug(D_RRD_STATS, "%s/%s: VOLUME[%ld] = " CALCULATED_NUMBER_FORMAT ", calculated  = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s"
1315                                                 , st->id, rd->name
1316                                                 , st->current_entry
1317                                                 , rd->stored_volume
1318                                                 , rd->collected_volume
1319                                                 , accuracy
1320                                                 , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1321                                                 );
1322
1323                         }
1324                 }
1325                 // reset the storage flags for the next point, if any;
1326                 storage_flags = SN_EXISTS;
1327
1328                 st->counter++;
1329                 st->current_entry = ((st->current_entry + 1) >= st->entries) ? 0 : st->current_entry + 1;
1330                 last_ut = next_ut;
1331         }
1332
1333         // align next interpolation to last collection point
1334         if(likely(stored_entries || !store_this_entry)) {
1335                 st->last_updated.tv_sec = st->last_collected_time.tv_sec;
1336                 st->last_updated.tv_usec = st->last_collected_time.tv_usec;
1337                 st->last_collected_total  = st->collected_total;
1338         }
1339
1340         for( rd = st->dimensions; likely(rd) ; rd = rd->next ) {
1341                 if(unlikely(!rd->updated)) continue;
1342
1343                 if(likely(stored_entries || !store_this_entry)) {
1344                         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_collected_value (old: " COLLECTED_NUMBER_FORMAT ") to last_collected_value (new: " COLLECTED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_collected_value, rd->collected_value);
1345                         rd->last_collected_value = rd->collected_value;
1346
1347                         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value, rd->calculated_value);
1348                         rd->last_calculated_value = rd->calculated_value;
1349                 }
1350
1351                 rd->calculated_value = 0;
1352                 rd->collected_value = 0;
1353                 rd->updated = 0;
1354
1355                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: END "
1356                         " last_collected_value = " COLLECTED_NUMBER_FORMAT
1357                         " collected_value = " COLLECTED_NUMBER_FORMAT
1358                         " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1359                         " calculated_value = " CALCULATED_NUMBER_FORMAT
1360                         , st->id, rd->name
1361                         , rd->last_collected_value
1362                         , rd->collected_value
1363                         , rd->last_calculated_value
1364                         , rd->calculated_value
1365                         );
1366         }
1367
1368         // ALL DONE ABOUT THE DATA UPDATE
1369         // --------------------------------------------------------------------
1370
1371         // find if there are any obsolete dimensions (not updated recently)
1372         if(unlikely(rrd_delete_unupdated_dimensions)) {
1373
1374                 for( rd = st->dimensions; likely(rd) ; rd = rd->next )
1375                         if((rd->last_collected_time.tv_sec + (rrd_delete_unupdated_dimensions * st->update_every)) < st->last_collected_time.tv_sec)
1376                                 break;
1377
1378                 if(unlikely(rd)) {
1379                         // there is dimension to free
1380                         // upgrade our read lock to a write lock
1381                         pthread_rwlock_unlock(&st->rwlock);
1382                         pthread_rwlock_wrlock(&st->rwlock);
1383
1384                         for( rd = st->dimensions, last = NULL ; likely(rd) ; ) {
1385                                 // remove it only it is not updated in rrd_delete_unupdated_dimensions seconds
1386
1387                                 if(unlikely((rd->last_collected_time.tv_sec + (rrd_delete_unupdated_dimensions * st->update_every)) < st->last_collected_time.tv_sec)) {
1388                                         info("Removing obsolete dimension '%s' (%s) of '%s' (%s).", rd->name, rd->id, st->name, st->id);
1389
1390                                         if(unlikely(!last)) {
1391                                                 st->dimensions = rd->next;
1392                                                 rd->next = NULL;
1393                                                 rrddim_free(st, rd);
1394                                                 rd = st->dimensions;
1395                                                 continue;
1396                                         }
1397                                         else {
1398                                                 last->next = rd->next;
1399                                                 rd->next = NULL;
1400                                                 rrddim_free(st, rd);
1401                                                 rd = last->next;
1402                                                 continue;
1403                                         }
1404                                 }
1405
1406                                 last = rd;
1407                                 rd = rd->next;
1408                         }
1409
1410                         if(unlikely(!st->dimensions)) {
1411                                 info("Disabling chart %s (%s) since it does not have any dimensions", st->name, st->id);
1412                                 st->enabled = 0;
1413                         }
1414                 }
1415         }
1416
1417         pthread_rwlock_unlock(&st->rwlock);
1418
1419         if(unlikely(pthread_setcancelstate(oldstate, NULL) != 0))
1420                 error("Cannot set pthread cancel state to RESTORE (%d).", oldstate);
1421
1422         return(st->usec_since_last_update);
1423 }