]> arthur.barton.de Git - netdata.git/blob - src/rrd.c
Merge remote-tracking branch 'firehol/master'
[netdata.git] / src / rrd.c
1 #include "common.h"
2
3 #define RRD_DEFAULT_GAP_INTERPOLATIONS 1
4
5 // ----------------------------------------------------------------------------
6 // globals
7
8 // if not zero it gives the time (in seconds) to remove un-updated dimensions
9 // DO NOT ENABLE
10 // if dimensions are removed, the chart generation will have to run again
11 int rrd_delete_unupdated_dimensions = 0;
12
13 int rrd_update_every = UPDATE_EVERY;
14 int rrd_default_history_entries = RRD_DEFAULT_HISTORY_ENTRIES;
15 int rrd_memory_mode = RRD_MEMORY_MODE_SAVE;
16
17 static int rrdset_compare(void* a, void* b);
18 static int rrdset_compare_name(void* a, void* b);
19 static int rrdfamily_compare(void *a, void *b);
20
21 // ----------------------------------------------------------------------------
22 // RRDHOST
23
24 RRDHOST localhost = {
25         .hostname = "localhost",
26         .rrdset_root = NULL,
27         .rrdset_root_rwlock = PTHREAD_RWLOCK_INITIALIZER,
28         .rrdset_root_index = {
29             { NULL, rrdset_compare },
30             AVL_LOCK_INITIALIZER
31         },
32         .rrdset_root_index_name = {
33             { NULL, rrdset_compare_name },
34             AVL_LOCK_INITIALIZER
35         },
36         .rrdfamily_root_index = {
37             { NULL, rrdfamily_compare },
38             AVL_LOCK_INITIALIZER
39         },
40         .variables_root_index = {
41             { NULL, rrdvar_compare },
42             AVL_LOCK_INITIALIZER
43         }
44 };
45
46 void rrdhost_rwlock(RRDHOST *host) {
47     pthread_rwlock_wrlock(&host->rrdset_root_rwlock);
48 }
49
50 void rrdhost_rdlock(RRDHOST *host) {
51     pthread_rwlock_rdlock(&host->rrdset_root_rwlock);
52 }
53
54 void rrdhost_unlock(RRDHOST *host) {
55     pthread_rwlock_unlock(&host->rrdset_root_rwlock);
56 }
57
58 void rrdhost_check_rdlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line) {
59     int ret = pthread_rwlock_trywrlock(&host->rrdset_root_rwlock);
60
61     if(ret == 0)
62         fatal("RRDHOST '%s' should be read-locked, but it is not, at function %s() at line %lu of file '%s'", host->hostname, function, line, file);
63 }
64
65 void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line) {
66     int ret = pthread_rwlock_tryrdlock(&host->rrdset_root_rwlock);
67
68     if(ret == 0)
69         fatal("RRDHOST '%s' should be write-locked, but it is not, at function %s() at line %lu of file '%s'", host->hostname, function, line, file);
70 }
71
72 // ----------------------------------------------------------------------------
73 // RRDFAMILY index
74
75 static int rrdfamily_compare(void *a, void *b) {
76     if(((RRDFAMILY *)a)->hash_family < ((RRDFAMILY *)b)->hash_family) return -1;
77     else if(((RRDFAMILY *)a)->hash_family > ((RRDFAMILY *)b)->hash_family) return 1;
78     else return strcmp(((RRDFAMILY *)a)->family, ((RRDFAMILY *)b)->family);
79 }
80
81 #define rrdfamily_index_add(host, rc) (RRDFAMILY *)avl_insert_lock(&((host)->rrdfamily_root_index), (avl *)(rc))
82 #define rrdfamily_index_del(host, rc) (RRDFAMILY *)avl_remove_lock(&((host)->rrdfamily_root_index), (avl *)(rc))
83
84 static RRDFAMILY *rrdfamily_index_find(RRDHOST *host, const char *id, uint32_t hash) {
85     RRDFAMILY tmp;
86     tmp.family = id;
87     tmp.hash_family = (hash)?hash:simple_hash(tmp.family);
88
89     return (RRDFAMILY *)avl_search_lock(&(host->rrdfamily_root_index), (avl *) &tmp);
90 }
91
92 RRDFAMILY *rrdfamily_create(const char *id) {
93     RRDFAMILY *rc = rrdfamily_index_find(&localhost, id, 0);
94     if(!rc) {
95         rc = callocz(1, sizeof(RRDFAMILY));
96
97         rc->family = strdupz(id);
98         rc->hash_family = simple_hash(rc->family);
99
100         // initialize the variables index
101         avl_init_lock(&rc->variables_root_index, rrdvar_compare);
102
103         RRDFAMILY *ret = rrdfamily_index_add(&localhost, rc);
104         if(ret != rc)
105             fatal("INTERNAL ERROR: Expected to INSERT RRDFAMILY '%s' into index, but inserted '%s'.", rc->family, (ret)?ret->family:"NONE");
106     }
107
108     rc->use_count++;
109     return rc;
110 }
111
112 void rrdfamily_free(RRDFAMILY *rc) {
113     rc->use_count--;
114     if(!rc->use_count) {
115         RRDFAMILY *ret = rrdfamily_index_del(&localhost, rc);
116         if(ret != rc)
117             fatal("INTERNAL ERROR: Expected to DELETE RRDFAMILY '%s' from index, but deleted '%s'.", rc->family, (ret)?ret->family:"NONE");
118
119         if(rc->variables_root_index.avl_tree.root != NULL)
120             fatal("INTERNAL ERROR: Variables index of RRDFAMILY '%s' that is freed, is not empty.", rc->family);
121
122         freez((void *)rc->family);
123         freez(rc);
124     }
125 }
126
127 // ----------------------------------------------------------------------------
128 // RRDSET index
129
130 static int rrdset_compare(void* a, void* b) {
131     if(((RRDSET *)a)->hash < ((RRDSET *)b)->hash) return -1;
132     else if(((RRDSET *)a)->hash > ((RRDSET *)b)->hash) return 1;
133     else return strcmp(((RRDSET *)a)->id, ((RRDSET *)b)->id);
134 }
135
136 #define rrdset_index_add(host, st) (RRDSET *)avl_insert_lock(&((host)->rrdset_root_index), (avl *)(st))
137 #define rrdset_index_del(host, st) (RRDSET *)avl_remove_lock(&((host)->rrdset_root_index), (avl *)(st))
138
139 static RRDSET *rrdset_index_find(RRDHOST *host, const char *id, uint32_t hash) {
140     RRDSET tmp;
141     strncpyz(tmp.id, id, RRD_ID_LENGTH_MAX);
142     tmp.hash = (hash)?hash:simple_hash(tmp.id);
143
144     return (RRDSET *)avl_search_lock(&(host->rrdset_root_index), (avl *) &tmp);
145 }
146
147 // ----------------------------------------------------------------------------
148 // RRDSET name index
149
150 #define rrdset_from_avlname(avlname_ptr) ((RRDSET *)((avlname_ptr) - offsetof(RRDSET, avlname)))
151
152 static int rrdset_compare_name(void* a, void* b) {
153     RRDSET *A = rrdset_from_avlname(a);
154     RRDSET *B = rrdset_from_avlname(b);
155
156     // fprintf(stderr, "COMPARING: %s with %s\n", A->name, B->name);
157
158     if(A->hash_name < B->hash_name) return -1;
159     else if(A->hash_name > B->hash_name) return 1;
160     else return strcmp(A->name, B->name);
161 }
162
163 RRDSET *rrdset_index_add_name(RRDHOST *host, RRDSET *st) {
164     void *result;
165     // fprintf(stderr, "ADDING: %s (name: %s)\n", st->id, st->name);
166     result = avl_insert_lock(&host->rrdset_root_index_name, (avl *) (&st->avlname));
167     if(result) return rrdset_from_avlname(result);
168     return NULL;
169 }
170
171 RRDSET *rrdset_index_del_name(RRDHOST *host, RRDSET *st) {
172     void *result;
173     // fprintf(stderr, "DELETING: %s (name: %s)\n", st->id, st->name);
174     result = (RRDSET *)avl_remove_lock(&((host)->rrdset_root_index_name), (avl *)(&st->avlname));
175     if(result) return rrdset_from_avlname(result);
176     return NULL;
177 }
178
179 static RRDSET *rrdset_index_find_name(RRDHOST *host, const char *name, uint32_t hash) {
180     void *result = NULL;
181     RRDSET tmp;
182     tmp.name = name;
183     tmp.hash_name = (hash)?hash:simple_hash(tmp.name);
184
185     // fprintf(stderr, "SEARCHING: %s\n", name);
186     result = avl_search_lock(&host->rrdset_root_index_name, (avl *) (&(tmp.avlname)));
187     if(result) {
188         RRDSET *st = rrdset_from_avlname(result);
189         if(strcmp(st->magic, RRDSET_MAGIC))
190             error("Search for RRDSET %s returned an invalid RRDSET %s (name %s)", name, st->id, st->name);
191
192         // fprintf(stderr, "FOUND: %s\n", name);
193         return rrdset_from_avlname(result);
194     }
195     // fprintf(stderr, "NOT FOUND: %s\n", name);
196     return NULL;
197 }
198
199
200 // ----------------------------------------------------------------------------
201 // RRDDIM index
202
203 static int rrddim_compare(void* a, void* b) {
204     if(((RRDDIM *)a)->hash < ((RRDDIM *)b)->hash) return -1;
205     else if(((RRDDIM *)a)->hash > ((RRDDIM *)b)->hash) return 1;
206     else return strcmp(((RRDDIM *)a)->id, ((RRDDIM *)b)->id);
207 }
208
209 #define rrddim_index_add(st, rd) avl_insert_lock(&((st)->dimensions_index), (avl *)(rd))
210 #define rrddim_index_del(st,rd ) avl_remove_lock(&((st)->dimensions_index), (avl *)(rd))
211
212 static RRDDIM *rrddim_index_find(RRDSET *st, const char *id, uint32_t hash) {
213     RRDDIM tmp;
214     strncpyz(tmp.id, id, RRD_ID_LENGTH_MAX);
215     tmp.hash = (hash)?hash:simple_hash(tmp.id);
216
217     return (RRDDIM *)avl_search_lock(&(st->dimensions_index), (avl *) &tmp);
218 }
219
220 // ----------------------------------------------------------------------------
221 // chart types
222
223 int rrdset_type_id(const char *name)
224 {
225     if(unlikely(strcmp(name, RRDSET_TYPE_AREA_NAME) == 0)) return RRDSET_TYPE_AREA;
226     else if(unlikely(strcmp(name, RRDSET_TYPE_STACKED_NAME) == 0)) return RRDSET_TYPE_STACKED;
227     else if(unlikely(strcmp(name, RRDSET_TYPE_LINE_NAME) == 0)) return RRDSET_TYPE_LINE;
228     return RRDSET_TYPE_LINE;
229 }
230
231 const char *rrdset_type_name(int chart_type)
232 {
233     static char line[] = RRDSET_TYPE_LINE_NAME;
234     static char area[] = RRDSET_TYPE_AREA_NAME;
235     static char stacked[] = RRDSET_TYPE_STACKED_NAME;
236
237     switch(chart_type) {
238         case RRDSET_TYPE_LINE:
239             return line;
240
241         case RRDSET_TYPE_AREA:
242             return area;
243
244         case RRDSET_TYPE_STACKED:
245             return stacked;
246     }
247     return line;
248 }
249
250 // ----------------------------------------------------------------------------
251 // load / save
252
253 const char *rrd_memory_mode_name(int id)
254 {
255     static const char ram[] = RRD_MEMORY_MODE_RAM_NAME;
256     static const char map[] = RRD_MEMORY_MODE_MAP_NAME;
257     static const char save[] = RRD_MEMORY_MODE_SAVE_NAME;
258
259     switch(id) {
260         case RRD_MEMORY_MODE_RAM:
261             return ram;
262
263         case RRD_MEMORY_MODE_MAP:
264             return map;
265
266         case RRD_MEMORY_MODE_SAVE:
267         default:
268             return save;
269     }
270
271     return save;
272 }
273
274 int rrd_memory_mode_id(const char *name)
275 {
276     if(unlikely(!strcmp(name, RRD_MEMORY_MODE_RAM_NAME)))
277         return RRD_MEMORY_MODE_RAM;
278     else if(unlikely(!strcmp(name, RRD_MEMORY_MODE_MAP_NAME)))
279         return RRD_MEMORY_MODE_MAP;
280
281     return RRD_MEMORY_MODE_SAVE;
282 }
283
284 // ----------------------------------------------------------------------------
285 // algorithms types
286
287 int rrddim_algorithm_id(const char *name)
288 {
289     if(strcmp(name, RRDDIM_INCREMENTAL_NAME) == 0)          return RRDDIM_INCREMENTAL;
290     if(strcmp(name, RRDDIM_ABSOLUTE_NAME) == 0)             return RRDDIM_ABSOLUTE;
291     if(strcmp(name, RRDDIM_PCENT_OVER_ROW_TOTAL_NAME) == 0)         return RRDDIM_PCENT_OVER_ROW_TOTAL;
292     if(strcmp(name, RRDDIM_PCENT_OVER_DIFF_TOTAL_NAME) == 0)    return RRDDIM_PCENT_OVER_DIFF_TOTAL;
293     return RRDDIM_ABSOLUTE;
294 }
295
296 const char *rrddim_algorithm_name(int chart_type)
297 {
298     static char absolute[] = RRDDIM_ABSOLUTE_NAME;
299     static char incremental[] = RRDDIM_INCREMENTAL_NAME;
300     static char percentage_of_absolute_row[] = RRDDIM_PCENT_OVER_ROW_TOTAL_NAME;
301     static char percentage_of_incremental_row[] = RRDDIM_PCENT_OVER_DIFF_TOTAL_NAME;
302
303     switch(chart_type) {
304         case RRDDIM_ABSOLUTE:
305             return absolute;
306
307         case RRDDIM_INCREMENTAL:
308             return incremental;
309
310         case RRDDIM_PCENT_OVER_ROW_TOTAL:
311             return percentage_of_absolute_row;
312
313         case RRDDIM_PCENT_OVER_DIFF_TOTAL:
314             return percentage_of_incremental_row;
315     }
316     return absolute;
317 }
318
319 // ----------------------------------------------------------------------------
320 // chart names
321
322 char *rrdset_strncpyz_name(char *to, const char *from, size_t length)
323 {
324     char c, *p = to;
325
326     while (length-- && (c = *from++)) {
327         if(c != '.' && !isalnum(c))
328             c = '_';
329
330         *p++ = c;
331     }
332
333     *p = '\0';
334
335     return to;
336 }
337
338 void rrdset_set_name(RRDSET *st, const char *name)
339 {
340     debug(D_RRD_CALLS, "rrdset_set_name() old: %s, new: %s", st->name, name);
341
342     if(st->name) {
343         rrdset_index_del_name(&localhost, st);
344         rrdsetvar_rename_all(st);
345     }
346
347     char b[CONFIG_MAX_VALUE + 1];
348     char n[RRD_ID_LENGTH_MAX + 1];
349
350     snprintfz(n, RRD_ID_LENGTH_MAX, "%s.%s", st->type, name);
351     rrdset_strncpyz_name(b, n, CONFIG_MAX_VALUE);
352     st->name = config_get(st->id, "name", b);
353     st->hash_name = simple_hash(st->name);
354
355     rrdset_index_add_name(&localhost, st);
356 }
357
358 // ----------------------------------------------------------------------------
359 // cache directory
360
361 char *rrdset_cache_dir(const char *id)
362 {
363     char *ret = NULL;
364
365     static char *cache_dir = NULL;
366     if(!cache_dir) {
367         cache_dir = config_get("global", "cache directory", CACHE_DIR);
368         int r = mkdir(cache_dir, 0755);
369         if(r != 0 && errno != EEXIST)
370             error("Cannot create directory '%s'", cache_dir);
371     }
372
373     char b[FILENAME_MAX + 1];
374     char n[FILENAME_MAX + 1];
375     rrdset_strncpyz_name(b, id, FILENAME_MAX);
376
377     snprintfz(n, FILENAME_MAX, "%s/%s", cache_dir, b);
378     ret = config_get(id, "cache directory", n);
379
380     if(rrd_memory_mode == RRD_MEMORY_MODE_MAP || rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
381         int r = mkdir(ret, 0775);
382         if(r != 0 && errno != EEXIST)
383             error("Cannot create directory '%s'", ret);
384     }
385
386     return ret;
387 }
388
389 // ----------------------------------------------------------------------------
390 // core functions
391
392 void rrdset_reset(RRDSET *st)
393 {
394     debug(D_RRD_CALLS, "rrdset_reset() %s", st->name);
395
396     st->last_collected_time.tv_sec = 0;
397     st->last_collected_time.tv_usec = 0;
398     st->last_updated.tv_sec = 0;
399     st->last_updated.tv_usec = 0;
400     st->current_entry = 0;
401     st->counter = 0;
402     st->counter_done = 0;
403
404     RRDDIM *rd;
405     for(rd = st->dimensions; rd ; rd = rd->next) {
406         rd->last_collected_time.tv_sec = 0;
407         rd->last_collected_time.tv_usec = 0;
408         rd->counter = 0;
409         bzero(rd->values, rd->entries * sizeof(storage_number));
410     }
411 }
412
413 RRDSET *rrdset_create(const char *type, const char *id, const char *name, const char *family, const char *context, const char *title, const char *units, long priority, int update_every, int chart_type)
414 {
415     if(!type || !type[0]) {
416         fatal("Cannot create rrd stats without a type.");
417         return NULL;
418     }
419
420     if(!id || !id[0]) {
421         fatal("Cannot create rrd stats without an id.");
422         return NULL;
423     }
424
425     char fullid[RRD_ID_LENGTH_MAX + 1];
426     char fullfilename[FILENAME_MAX + 1];
427     RRDSET *st = NULL;
428
429     snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
430
431     st = rrdset_find(fullid);
432     if(st) {
433         error("Cannot create rrd stats for '%s', it already exists.", fullid);
434         return st;
435     }
436
437     long entries = config_get_number(fullid, "history", rrd_default_history_entries);
438     if(entries < 5) entries = config_set_number(fullid, "history", 5);
439     if(entries > RRD_HISTORY_ENTRIES_MAX) entries = config_set_number(fullid, "history", RRD_HISTORY_ENTRIES_MAX);
440
441     int enabled = config_get_boolean(fullid, "enabled", 1);
442     if(!enabled) entries = 5;
443
444     unsigned long size = sizeof(RRDSET);
445     char *cache_dir = rrdset_cache_dir(fullid);
446
447     debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id);
448
449     snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir);
450     if(rrd_memory_mode != RRD_MEMORY_MODE_RAM) st = (RRDSET *)mymmap(fullfilename, size, ((rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 0);
451     if(st) {
452         if(strcmp(st->magic, RRDSET_MAGIC) != 0) {
453             errno = 0;
454             info("Initializing file %s.", fullfilename);
455             bzero(st, size);
456         }
457         else if(strcmp(st->id, fullid) != 0) {
458             errno = 0;
459             error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid);
460             // munmap(st, size);
461             // st = NULL;
462             bzero(st, size);
463         }
464         else if(st->memsize != size || st->entries != entries) {
465             errno = 0;
466             error("File %s does not have the desired size. Clearing it.", fullfilename);
467             bzero(st, size);
468         }
469         else if(st->update_every != update_every) {
470             errno = 0;
471             error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
472             bzero(st, size);
473         }
474         else if((time(NULL) - st->last_updated.tv_sec) > update_every * entries) {
475             errno = 0;
476             error("File %s is too old. Clearing it.", fullfilename);
477             bzero(st, size);
478         }
479     }
480
481     if(st) {
482         st->name = NULL;
483         st->type = NULL;
484         st->family = NULL;
485         st->context = NULL;
486         st->title = NULL;
487         st->units = NULL;
488         st->dimensions = NULL;
489         st->next = NULL;
490         st->mapped = rrd_memory_mode;
491         st->variables = NULL;
492     }
493     else {
494         st = callocz(1, size);
495         st->mapped = RRD_MEMORY_MODE_RAM;
496     }
497
498     st->memsize = size;
499     st->entries = entries;
500     st->update_every = update_every;
501
502     if(st->current_entry >= st->entries) st->current_entry = 0;
503
504     strcpy(st->cache_filename, fullfilename);
505     strcpy(st->magic, RRDSET_MAGIC);
506
507     strcpy(st->id, fullid);
508     st->hash = simple_hash(st->id);
509
510     st->cache_dir = cache_dir;
511
512     st->chart_type = rrdset_type_id(config_get(st->id, "chart type", rrdset_type_name(chart_type)));
513     st->type       = config_get(st->id, "type", type);
514     st->family     = config_get(st->id, "family", family?family:st->type);
515     st->units      = config_get(st->id, "units", units?units:"");
516
517     st->context    = config_get(st->id, "context", context?context:st->id);
518     st->hash_context = simple_hash(st->context);
519
520     st->priority = config_get_number(st->id, "priority", priority);
521     st->enabled = enabled;
522
523     st->isdetail = 0;
524     st->debug = 0;
525
526     // if(!strcmp(st->id, "disk_util.dm-0")) {
527     //     st->debug = 1;
528     //     error("enabled debugging for '%s'", st->id);
529     // }
530     // else error("not enabled debugging for '%s'", st->id);
531
532     st->green = NAN;
533     st->red = NAN;
534
535     st->last_collected_time.tv_sec = 0;
536     st->last_collected_time.tv_usec = 0;
537     st->counter_done = 0;
538
539     st->gap_when_lost_iterations_above = (int) (
540             config_get_number(st->id, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS) + 2);
541
542     avl_init_lock(&st->dimensions_index, rrddim_compare);
543     avl_init_lock(&st->variables_root_index, rrdvar_compare);
544
545     pthread_rwlock_init(&st->rwlock, NULL);
546     rrdhost_rwlock(&localhost);
547
548     if(name && *name) rrdset_set_name(st, name);
549     else rrdset_set_name(st, id);
550
551     {
552         char varvalue[CONFIG_MAX_VALUE + 1];
553         snprintfz(varvalue, CONFIG_MAX_VALUE, "%s (%s)", title?title:"", st->name);
554         st->title = config_get(st->id, "title", varvalue);
555     }
556
557     st->rrdfamily = rrdfamily_create(st->family);
558     st->rrdhost = &localhost;
559
560     st->next = localhost.rrdset_root;
561     localhost.rrdset_root = st;
562
563     if(health_enabled) {
564         rrdsetvar_create(st, "last_collected_t", RRDVAR_TYPE_TIME_T, &st->last_collected_time.tv_sec, 0);
565         rrdsetvar_create(st, "collected_total_raw", RRDVAR_TYPE_TOTAL, &st->last_collected_total, 0);
566         rrdsetvar_create(st, "green", RRDVAR_TYPE_CALCULATED, &st->green, 0);
567         rrdsetvar_create(st, "red", RRDVAR_TYPE_CALCULATED, &st->red, 0);
568         rrdsetvar_create(st, "update_every", RRDVAR_TYPE_INT, &st->update_every, 0);
569     }
570
571     rrdset_index_add(&localhost, st);
572
573     rrdsetcalc_link_matching(st);
574     rrdcalctemplate_link_matching(st);
575
576     rrdhost_unlock(&localhost);
577
578     return(st);
579 }
580
581 RRDDIM *rrddim_add(RRDSET *st, const char *id, const char *name, long multiplier, long divisor, int algorithm)
582 {
583     char filename[FILENAME_MAX + 1];
584     char fullfilename[FILENAME_MAX + 1];
585
586     char varname[CONFIG_MAX_NAME + 1];
587     RRDDIM *rd = NULL;
588     unsigned long size = sizeof(RRDDIM) + (st->entries * sizeof(storage_number));
589
590     debug(D_RRD_CALLS, "Adding dimension '%s/%s'.", st->id, id);
591
592     rrdset_strncpyz_name(filename, id, FILENAME_MAX);
593     snprintfz(fullfilename, FILENAME_MAX, "%s/%s.db", st->cache_dir, filename);
594     if(rrd_memory_mode != RRD_MEMORY_MODE_RAM) rd = (RRDDIM *)mymmap(fullfilename, size, ((rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 1);
595     if(rd) {
596         struct timeval now;
597         gettimeofday(&now, NULL);
598
599         if(strcmp(rd->magic, RRDDIMENSION_MAGIC) != 0) {
600             errno = 0;
601             info("Initializing file %s.", fullfilename);
602             bzero(rd, size);
603         }
604         else if(rd->memsize != size) {
605             errno = 0;
606             error("File %s does not have the desired size. Clearing it.", fullfilename);
607             bzero(rd, size);
608         }
609         else if(rd->multiplier != multiplier) {
610             errno = 0;
611             error("File %s does not have the same multiplier. Clearing it.", fullfilename);
612             bzero(rd, size);
613         }
614         else if(rd->divisor != divisor) {
615             errno = 0;
616             error("File %s does not have the same divisor. Clearing it.", fullfilename);
617             bzero(rd, size);
618         }
619         else if(rd->algorithm != algorithm) {
620             errno = 0;
621             error("File %s does not have the same algorithm. Clearing it.", fullfilename);
622             bzero(rd, size);
623         }
624         else if(rd->update_every != st->update_every) {
625             errno = 0;
626             error("File %s does not have the same refresh frequency. Clearing it.", fullfilename);
627             bzero(rd, size);
628         }
629         else if(usec_dt(&now, &rd->last_collected_time) > (rd->entries * rd->update_every * 1000000ULL)) {
630             errno = 0;
631             error("File %s is too old. Clearing it.", fullfilename);
632             bzero(rd, size);
633         }
634         else if(strcmp(rd->id, id) != 0) {
635             errno = 0;
636             error("File %s contents are not for dimension %s. Clearing it.", fullfilename, id);
637             // munmap(rd, size);
638             // rd = NULL;
639             bzero(rd, size);
640         }
641     }
642
643     if(rd) {
644         // we have a file mapped for rd
645         rd->mapped = rrd_memory_mode;
646         rd->flags = 0x00000000;
647         rd->variables = NULL;
648         rd->next = NULL;
649         rd->name = NULL;
650     }
651     else {
652         // if we didn't manage to get a mmap'd dimension, just create one
653
654         rd = callocz(1, size);
655         rd->mapped = RRD_MEMORY_MODE_RAM;
656     }
657     rd->memsize = size;
658
659     strcpy(rd->magic, RRDDIMENSION_MAGIC);
660     strcpy(rd->cache_filename, fullfilename);
661     strncpyz(rd->id, id, RRD_ID_LENGTH_MAX);
662     rd->hash = simple_hash(rd->id);
663
664     snprintfz(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
665     rd->name = config_get(st->id, varname, (name && *name)?name:rd->id);
666
667     snprintfz(varname, CONFIG_MAX_NAME, "dim %s algorithm", rd->id);
668     rd->algorithm = rrddim_algorithm_id(config_get(st->id, varname, rrddim_algorithm_name(algorithm)));
669
670     snprintfz(varname, CONFIG_MAX_NAME, "dim %s multiplier", rd->id);
671     rd->multiplier = config_get_number(st->id, varname, multiplier);
672
673     snprintfz(varname, CONFIG_MAX_NAME, "dim %s divisor", rd->id);
674     rd->divisor = config_get_number(st->id, varname, divisor);
675     if(!rd->divisor) rd->divisor = 1;
676
677     rd->entries = st->entries;
678     rd->update_every = st->update_every;
679
680     // prevent incremental calculation spikes
681     rd->counter = 0;
682     rd->updated = 0;
683     rd->calculated_value = 0;
684     rd->last_calculated_value = 0;
685     rd->collected_value = 0;
686     rd->last_collected_value = 0;
687     rd->collected_volume = 0;
688     rd->stored_volume = 0;
689     rd->last_stored_value = 0;
690     rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
691     rd->last_collected_time.tv_sec = 0;
692     rd->last_collected_time.tv_usec = 0;
693     rd->rrdset = st;
694
695     // append this dimension
696     pthread_rwlock_wrlock(&st->rwlock);
697     if(!st->dimensions)
698         st->dimensions = rd;
699     else {
700         RRDDIM *td = st->dimensions;
701         for(; td->next; td = td->next) ;
702         td->next = rd;
703     }
704
705     if(health_enabled) {
706         rrddimvar_create(rd, RRDVAR_TYPE_CALCULATED, NULL, NULL, &rd->last_stored_value, 0);
707         rrddimvar_create(rd, RRDVAR_TYPE_COLLECTED, NULL, "_raw", &rd->last_collected_value, 0);
708         rrddimvar_create(rd, RRDVAR_TYPE_TIME_T, NULL, "_last_collected_t", &rd->last_collected_time.tv_sec, 0);
709     }
710
711     pthread_rwlock_unlock(&st->rwlock);
712
713     rrddim_index_add(st, rd);
714
715     return(rd);
716 }
717
718 void rrddim_set_name(RRDSET *st, RRDDIM *rd, const char *name)
719 {
720     debug(D_RRD_CALLS, "rrddim_set_name() %s.%s", st->name, rd->name);
721
722     char varname[CONFIG_MAX_NAME + 1];
723     snprintfz(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
724     config_set_default(st->id, varname, name);
725
726     rrddimvar_rename_all(rd);
727 }
728
729 void rrddim_free(RRDSET *st, RRDDIM *rd)
730 {
731     debug(D_RRD_CALLS, "rrddim_free() %s.%s", st->name, rd->name);
732
733     if(rd == st->dimensions)
734         st->dimensions = rd->next;
735     else {
736         RRDDIM *i;
737         for (i = st->dimensions; i && i->next != rd; i = i->next) ;
738
739         if (i && i->next == rd)
740             i->next = rd->next;
741         else
742             error("Request to free dimension '%s.%s' but it is not linked.", st->id, rd->name);
743     }
744     rd->next = NULL;
745
746     while(rd->variables)
747         rrddimvar_free(rd->variables);
748
749     rrddim_index_del(st, rd);
750
751     // free(rd->annotations);
752     if(rd->mapped == RRD_MEMORY_MODE_SAVE) {
753         debug(D_RRD_CALLS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
754         savememory(rd->cache_filename, rd, rd->memsize);
755
756         debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
757         munmap(rd, rd->memsize);
758     }
759     else if(rd->mapped == RRD_MEMORY_MODE_MAP) {
760         debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
761         munmap(rd, rd->memsize);
762     }
763     else {
764         debug(D_RRD_CALLS, "Removing dimension '%s'.", rd->name);
765         freez(rd);
766     }
767 }
768
769 void rrdset_free_all(void)
770 {
771     info("Freeing all memory...");
772
773     rrdhost_rwlock(&localhost);
774
775     RRDSET *st;
776     for(st = localhost.rrdset_root; st ;) {
777         RRDSET *next = st->next;
778
779         pthread_rwlock_wrlock(&st->rwlock);
780
781         while(st->variables)
782             rrdsetvar_free(st->variables);
783
784         while(st->alarms)
785             rrdsetcalc_unlink(st->alarms);
786
787         while(st->dimensions)
788             rrddim_free(st, st->dimensions);
789
790         rrdset_index_del(&localhost, st);
791
792         st->rrdfamily->use_count--;
793         if(!st->rrdfamily->use_count)
794             rrdfamily_free(st->rrdfamily);
795
796         pthread_rwlock_unlock(&st->rwlock);
797
798         if(st->mapped == RRD_MEMORY_MODE_SAVE) {
799             debug(D_RRD_CALLS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
800             savememory(st->cache_filename, st, st->memsize);
801
802             debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
803             munmap(st, st->memsize);
804         }
805         else if(st->mapped == RRD_MEMORY_MODE_MAP) {
806             debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
807             munmap(st, st->memsize);
808         }
809         else
810             freez(st);
811
812         st = next;
813     }
814     localhost.rrdset_root = NULL;
815
816     rrdhost_unlock(&localhost);
817
818     info("Memory cleanup completed...");
819 }
820
821 void rrdset_save_all(void) {
822     info("Saving database...");
823
824     RRDSET *st;
825     RRDDIM *rd;
826
827     rrdhost_rwlock(&localhost);
828     for(st = localhost.rrdset_root; st ; st = st->next) {
829         pthread_rwlock_wrlock(&st->rwlock);
830
831         if(st->mapped == RRD_MEMORY_MODE_SAVE) {
832             debug(D_RRD_CALLS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
833             savememory(st->cache_filename, st, st->memsize);
834         }
835
836         for(rd = st->dimensions; rd ; rd = rd->next) {
837             if(likely(rd->mapped == RRD_MEMORY_MODE_SAVE)) {
838                 debug(D_RRD_CALLS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
839                 savememory(rd->cache_filename, rd, rd->memsize);
840             }
841         }
842
843         pthread_rwlock_unlock(&st->rwlock);
844     }
845     rrdhost_unlock(&localhost);
846 }
847
848
849 RRDSET *rrdset_find(const char *id)
850 {
851     debug(D_RRD_CALLS, "rrdset_find() for chart %s", id);
852
853     RRDSET *st = rrdset_index_find(&localhost, id, 0);
854     return(st);
855 }
856
857 RRDSET *rrdset_find_bytype(const char *type, const char *id)
858 {
859     debug(D_RRD_CALLS, "rrdset_find_bytype() for chart %s.%s", type, id);
860
861     char buf[RRD_ID_LENGTH_MAX + 1];
862
863     strncpyz(buf, type, RRD_ID_LENGTH_MAX - 1);
864     strcat(buf, ".");
865     int len = (int) strlen(buf);
866     strncpyz(&buf[len], id, (size_t) (RRD_ID_LENGTH_MAX - len));
867
868     return(rrdset_find(buf));
869 }
870
871 RRDSET *rrdset_find_byname(const char *name)
872 {
873     debug(D_RRD_CALLS, "rrdset_find_byname() for chart %s", name);
874
875     RRDSET *st = rrdset_index_find_name(&localhost, name, 0);
876     return(st);
877 }
878
879 RRDDIM *rrddim_find(RRDSET *st, const char *id)
880 {
881     debug(D_RRD_CALLS, "rrddim_find() for chart %s, dimension %s", st->name, id);
882
883     return rrddim_index_find(st, id, 0);
884 }
885
886 int rrddim_hide(RRDSET *st, const char *id)
887 {
888     debug(D_RRD_CALLS, "rrddim_hide() for chart %s, dimension %s", st->name, id);
889
890     RRDDIM *rd = rrddim_find(st, id);
891     if(unlikely(!rd)) {
892         error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
893         return 1;
894     }
895
896     rd->flags |= RRDDIM_FLAG_HIDDEN;
897     return 0;
898 }
899
900 int rrddim_unhide(RRDSET *st, const char *id)
901 {
902     debug(D_RRD_CALLS, "rrddim_unhide() for chart %s, dimension %s", st->name, id);
903
904     RRDDIM *rd = rrddim_find(st, id);
905     if(unlikely(!rd)) {
906         error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
907         return 1;
908     }
909
910     if(rd->flags & RRDDIM_FLAG_HIDDEN) rd->flags ^= RRDDIM_FLAG_HIDDEN;
911     return 0;
912 }
913
914 collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value)
915 {
916     debug(D_RRD_CALLS, "rrddim_set_by_pointer() for chart %s, dimension %s, value " COLLECTED_NUMBER_FORMAT, st->name, rd->name, value);
917
918     gettimeofday(&rd->last_collected_time, NULL);
919     rd->collected_value = value;
920     rd->updated = 1;
921     rd->counter++;
922
923     return rd->last_collected_value;
924 }
925
926 collected_number rrddim_set(RRDSET *st, const char *id, collected_number value)
927 {
928     RRDDIM *rd = rrddim_find(st, id);
929     if(unlikely(!rd)) {
930         error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
931         return 0;
932     }
933
934     return rrddim_set_by_pointer(st, rd, value);
935 }
936
937 void rrdset_next_usec(RRDSET *st, unsigned long long microseconds)
938 {
939     if(!microseconds) rrdset_next(st);
940     else {
941         debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds);
942
943         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: NEXT: %llu microseconds", st->name, microseconds);
944         st->usec_since_last_update = microseconds;
945     }
946 }
947
948 void rrdset_next(RRDSET *st)
949 {
950     unsigned long long microseconds = 0;
951
952     if(likely(st->last_collected_time.tv_sec)) {
953         struct timeval now;
954         gettimeofday(&now, NULL);
955         microseconds = usec_dt(&now, &st->last_collected_time);
956     }
957     // prevent infinite loop
958     else microseconds = st->update_every * 1000000ULL;
959
960     rrdset_next_usec(st, microseconds);
961 }
962
963 void rrdset_next_plugins(RRDSET *st)
964 {
965     rrdset_next(st);
966 }
967
968 unsigned long long rrdset_done(RRDSET *st)
969 {
970     if(unlikely(netdata_exit)) return 0;
971
972     debug(D_RRD_CALLS, "rrdset_done() for chart %s", st->name);
973
974     RRDDIM *rd, *last;
975     int oldstate, store_this_entry = 1, first_entry = 0;
976     unsigned long long last_ut, now_ut, next_ut, stored_entries = 0;
977
978     if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) != 0))
979         error("Cannot set pthread cancel state to DISABLE.");
980
981     // a read lock is OK here
982     pthread_rwlock_rdlock(&st->rwlock);
983
984     // enable the chart, if it was disabled
985     if(unlikely(rrd_delete_unupdated_dimensions) && !st->enabled)
986         st->enabled = 1;
987
988     // check if the chart has a long time to be updated
989     if(unlikely(st->usec_since_last_update > st->entries * st->update_every * 1000000ULL)) {
990         info("%s: took too long to be updated (%0.3Lf secs). Reseting it.", st->name, (long double)(st->usec_since_last_update / 1000000.0));
991         rrdset_reset(st);
992         st->usec_since_last_update = st->update_every * 1000000ULL;
993         first_entry = 1;
994     }
995     if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: microseconds since last update: %llu", st->name, st->usec_since_last_update);
996
997     // set last_collected_time
998     if(unlikely(!st->last_collected_time.tv_sec)) {
999         // it is the first entry
1000         // set the last_collected_time to now
1001         gettimeofday(&st->last_collected_time, NULL);
1002
1003         // the first entry should not be stored
1004         store_this_entry = 0;
1005         first_entry = 1;
1006
1007         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: has not set last_collected_time. Setting it now. Will not store the next entry.", st->name);
1008     }
1009     else {
1010         // it is not the first entry
1011         // calculate the proper last_collected_time, using usec_since_last_update
1012         unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec + st->usec_since_last_update;
1013         st->last_collected_time.tv_sec = (time_t) (ut / 1000000ULL);
1014         st->last_collected_time.tv_usec = (suseconds_t) (ut % 1000000ULL);
1015     }
1016
1017     // if this set has not been updated in the past
1018     // we fake the last_update time to be = now - usec_since_last_update
1019     if(unlikely(!st->last_updated.tv_sec)) {
1020         // it has never been updated before
1021         // set a fake last_updated, in the past using usec_since_last_update
1022         unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - st->usec_since_last_update;
1023         st->last_updated.tv_sec = (time_t) (ut / 1000000ULL);
1024         st->last_updated.tv_usec = (suseconds_t) (ut % 1000000ULL);
1025
1026         // the first entry should not be stored
1027         store_this_entry = 0;
1028         first_entry = 1;
1029
1030         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
1031     }
1032
1033     // check if we will re-write the entire data set
1034     if(unlikely(usec_dt(&st->last_collected_time, &st->last_updated) > st->update_every * st->entries * 1000000ULL)) {
1035         info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Reseting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
1036         rrdset_reset(st);
1037
1038         st->usec_since_last_update = st->update_every * 1000000ULL;
1039
1040         gettimeofday(&st->last_collected_time, NULL);
1041
1042         unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - st->usec_since_last_update;
1043         st->last_updated.tv_sec = (time_t) (ut / 1000000ULL);
1044         st->last_updated.tv_usec = (suseconds_t) (ut % 1000000ULL);
1045
1046         // the first entry should not be stored
1047         store_this_entry = 0;
1048         first_entry = 1;
1049     }
1050
1051     // these are the 3 variables that will help us in interpolation
1052     // last_ut = the last time we added a value to the storage
1053     //  now_ut = the time the current value is taken at
1054     // next_ut = the time of the next interpolation point
1055     last_ut = st->last_updated.tv_sec * 1000000ULL + st->last_updated.tv_usec;
1056     now_ut  = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec;
1057     next_ut = (st->last_updated.tv_sec + st->update_every) * 1000000ULL;
1058
1059     if(unlikely(!first_entry && now_ut < next_ut)) {
1060         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name);
1061     }
1062
1063     if(unlikely(st->debug)) {
1064         debug(D_RRD_STATS, "%s: last ut = %0.3Lf (last updated time)", st->name, (long double)last_ut/1000000.0);
1065         debug(D_RRD_STATS, "%s: now  ut = %0.3Lf (current update time)", st->name, (long double)now_ut/1000000.0);
1066         debug(D_RRD_STATS, "%s: next ut = %0.3Lf (next interpolation point)", st->name, (long double)next_ut/1000000.0);
1067     }
1068
1069     if(unlikely(!st->counter_done)) {
1070         store_this_entry = 0;
1071         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: Will not store the next entry.", st->name);
1072     }
1073     st->counter_done++;
1074
1075     // calculate totals and count the dimensions
1076     int dimensions;
1077     st->collected_total = 0;
1078     for( rd = st->dimensions, dimensions = 0 ; likely(rd) ; rd = rd->next, dimensions++ )
1079         if(likely(rd->updated)) st->collected_total += rd->collected_value;
1080
1081     uint32_t storage_flags = SN_EXISTS;
1082
1083     // process all dimensions to calculate their values
1084     // based on the collected figures only
1085     // at this stage we do not interpolate anything
1086     for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
1087
1088         if(unlikely(!rd->updated)) {
1089             rd->calculated_value = 0;
1090             continue;
1091         }
1092
1093         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: START "
1094             " last_collected_value = " COLLECTED_NUMBER_FORMAT
1095             " collected_value = " COLLECTED_NUMBER_FORMAT
1096             " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1097             " calculated_value = " CALCULATED_NUMBER_FORMAT
1098             , st->id, rd->name
1099             , rd->last_collected_value
1100             , rd->collected_value
1101             , rd->last_calculated_value
1102             , rd->calculated_value
1103             );
1104
1105         switch(rd->algorithm) {
1106             case RRDDIM_ABSOLUTE:
1107                 rd->calculated_value = (calculated_number)rd->collected_value
1108                     * (calculated_number)rd->multiplier
1109                     / (calculated_number)rd->divisor;
1110
1111                 if(unlikely(st->debug))
1112                     debug(D_RRD_STATS, "%s/%s: CALC ABS/ABS-NO-IN "
1113                         CALCULATED_NUMBER_FORMAT " = "
1114                         COLLECTED_NUMBER_FORMAT
1115                         " * " CALCULATED_NUMBER_FORMAT
1116                         " / " CALCULATED_NUMBER_FORMAT
1117                         , st->id, rd->name
1118                         , rd->calculated_value
1119                         , rd->collected_value
1120                         , (calculated_number)rd->multiplier
1121                         , (calculated_number)rd->divisor
1122                         );
1123                 break;
1124
1125             case RRDDIM_PCENT_OVER_ROW_TOTAL:
1126                 if(unlikely(!st->collected_total))
1127                     rd->calculated_value = 0;
1128                 else
1129                     // the percentage of the current value
1130                     // over the total of all dimensions
1131                     rd->calculated_value =
1132                           (calculated_number)100
1133                         * (calculated_number)rd->collected_value
1134                         / (calculated_number)st->collected_total;
1135
1136                 if(unlikely(st->debug))
1137                     debug(D_RRD_STATS, "%s/%s: CALC PCENT-ROW "
1138                         CALCULATED_NUMBER_FORMAT " = 100"
1139                         " * " COLLECTED_NUMBER_FORMAT
1140                         " / " COLLECTED_NUMBER_FORMAT
1141                         , st->id, rd->name
1142                         , rd->calculated_value
1143                         , rd->collected_value
1144                         , st->collected_total
1145                         );
1146                 break;
1147
1148             case RRDDIM_INCREMENTAL:
1149                 if(unlikely(rd->counter <= 1)) {
1150                     rd->calculated_value = 0;
1151                     continue;
1152                 }
1153
1154                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
1155                 // to reset the calculation (it will give zero as the calculation for this second)
1156                 if(unlikely(rd->last_collected_value > rd->collected_value)) {
1157                     debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
1158                             , st->name, rd->name
1159                             , rd->last_collected_value
1160                             , rd->collected_value);
1161                     if(!(rd->flags & RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)) storage_flags = SN_EXISTS_RESET;
1162                     rd->last_collected_value = rd->collected_value;
1163                 }
1164
1165                 rd->calculated_value =
1166                       (calculated_number)(rd->collected_value - rd->last_collected_value)
1167                     * (calculated_number)rd->multiplier
1168                     / (calculated_number)rd->divisor;
1169
1170                 if(unlikely(st->debug))
1171                     debug(D_RRD_STATS, "%s/%s: CALC INC PRE "
1172                         CALCULATED_NUMBER_FORMAT " = ("
1173                         COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT
1174                         ")"
1175                         " * " CALCULATED_NUMBER_FORMAT
1176                         " / " CALCULATED_NUMBER_FORMAT
1177                         , st->id, rd->name
1178                         , rd->calculated_value
1179                         , rd->collected_value, rd->last_collected_value
1180                         , (calculated_number)rd->multiplier
1181                         , (calculated_number)rd->divisor
1182                         );
1183                 break;
1184
1185             case RRDDIM_PCENT_OVER_DIFF_TOTAL:
1186                 if(unlikely(rd->counter <= 1)) {
1187                     rd->calculated_value = 0;
1188                     continue;
1189                 }
1190
1191                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
1192                 // to reset the calculation (it will give zero as the calculation for this second)
1193                 if(unlikely(rd->last_collected_value > rd->collected_value)) {
1194                     debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
1195                     , st->name, rd->name
1196                     , rd->last_collected_value
1197                     , rd->collected_value);
1198                     if(!(rd->flags & RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)) storage_flags = SN_EXISTS_RESET;
1199                     rd->last_collected_value = rd->collected_value;
1200                 }
1201
1202                 // the percentage of the current increment
1203                 // over the increment of all dimensions together
1204                 if(unlikely(st->collected_total == st->last_collected_total))
1205                     rd->calculated_value = 0;
1206                 else
1207                     rd->calculated_value =
1208                           (calculated_number)100
1209                         * (calculated_number)(rd->collected_value - rd->last_collected_value)
1210                         / (calculated_number)(st->collected_total - st->last_collected_total);
1211
1212                 if(unlikely(st->debug))
1213                     debug(D_RRD_STATS, "%s/%s: CALC PCENT-DIFF "
1214                         CALCULATED_NUMBER_FORMAT " = 100"
1215                         " * (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
1216                         " / (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
1217                         , st->id, rd->name
1218                         , rd->calculated_value
1219                         , rd->collected_value, rd->last_collected_value
1220                         , st->collected_total, st->last_collected_total
1221                         );
1222                 break;
1223
1224             default:
1225                 // make the default zero, to make sure
1226                 // it gets noticed when we add new types
1227                 rd->calculated_value = 0;
1228
1229                 if(unlikely(st->debug))
1230                     debug(D_RRD_STATS, "%s/%s: CALC "
1231                         CALCULATED_NUMBER_FORMAT " = 0"
1232                         , st->id, rd->name
1233                         , rd->calculated_value
1234                         );
1235                 break;
1236         }
1237
1238         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: PHASE2 "
1239             " last_collected_value = " COLLECTED_NUMBER_FORMAT
1240             " collected_value = " COLLECTED_NUMBER_FORMAT
1241             " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1242             " calculated_value = " CALCULATED_NUMBER_FORMAT
1243             , st->id, rd->name
1244             , rd->last_collected_value
1245             , rd->collected_value
1246             , rd->last_calculated_value
1247             , rd->calculated_value
1248             );
1249
1250     }
1251
1252     // at this point we have all the calculated values ready
1253     // it is now time to interpolate values on a second boundary
1254
1255     unsigned long long first_ut = last_ut;
1256     long long iterations = (now_ut - last_ut) / (st->update_every * 1000000ULL);
1257     if((now_ut % (st->update_every * 1000000ULL)) == 0) iterations++;
1258
1259     for( ; likely(next_ut <= now_ut) ; next_ut += st->update_every * 1000000ULL, iterations-- ) {
1260 #ifdef NETDATA_INTERNAL_CHECKS
1261         if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_ut = %llu, next_ut = %llu, now_ut = %llu", st->name, first_ut, last_ut, next_ut, now_ut); }
1262 #endif
1263
1264         if(unlikely(st->debug)) {
1265             debug(D_RRD_STATS, "%s: last ut = %0.3Lf (last updated time)", st->name, (long double)last_ut/1000000.0);
1266             debug(D_RRD_STATS, "%s: next ut = %0.3Lf (next interpolation point)", st->name, (long double)next_ut/1000000.0);
1267         }
1268
1269         st->last_updated.tv_sec = (time_t) (next_ut / 1000000ULL);
1270         st->last_updated.tv_usec = 0;
1271
1272         for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
1273             calculated_number new_value;
1274
1275             switch(rd->algorithm) {
1276                 case RRDDIM_INCREMENTAL:
1277                     new_value = (calculated_number)
1278                         (      rd->calculated_value
1279                             * (calculated_number)(next_ut - last_ut)
1280                             / (calculated_number)(now_ut - last_ut)
1281                         );
1282
1283                     if(unlikely(st->debug))
1284                         debug(D_RRD_STATS, "%s/%s: CALC2 INC "
1285                             CALCULATED_NUMBER_FORMAT " = "
1286                             CALCULATED_NUMBER_FORMAT
1287                             " * %llu"
1288                             " / %llu"
1289                             , st->id, rd->name
1290                             , new_value
1291                             , rd->calculated_value
1292                             , (next_ut - last_ut)
1293                             , (now_ut - last_ut)
1294                             );
1295
1296                     rd->calculated_value -= new_value;
1297                     new_value += rd->last_calculated_value;
1298                     rd->last_calculated_value = 0;
1299                     new_value /= (calculated_number)st->update_every;
1300                     break;
1301
1302                 case RRDDIM_ABSOLUTE:
1303                 case RRDDIM_PCENT_OVER_ROW_TOTAL:
1304                 case RRDDIM_PCENT_OVER_DIFF_TOTAL:
1305                 default:
1306                     if(iterations == 1) {
1307                         // this is the last iteration
1308                         // do not interpolate
1309                         // just show the calculated value
1310
1311                         new_value = rd->calculated_value;
1312                     }
1313                     else {
1314                         // we have missed an update
1315                         // interpolate in the middle values
1316
1317                         new_value = (calculated_number)
1318                             (   (     (rd->calculated_value - rd->last_calculated_value)
1319                                     * (calculated_number)(next_ut - first_ut)
1320                                     / (calculated_number)(now_ut - first_ut)
1321                                 )
1322                                 +  rd->last_calculated_value
1323                             );
1324
1325                         if(unlikely(st->debug))
1326                             debug(D_RRD_STATS, "%s/%s: CALC2 DEF "
1327                                 CALCULATED_NUMBER_FORMAT " = ((("
1328                                 "(" CALCULATED_NUMBER_FORMAT " - " CALCULATED_NUMBER_FORMAT ")"
1329                                 " * %llu"
1330                                 " / %llu) + " CALCULATED_NUMBER_FORMAT
1331                                 , st->id, rd->name
1332                                 , new_value
1333                                 , rd->calculated_value, rd->last_calculated_value
1334                                 , (next_ut - first_ut)
1335                                 , (now_ut - first_ut), rd->last_calculated_value
1336                                 );
1337
1338                         // this is wrong
1339                         // it fades the value towards the target
1340                         // while we know the calculated value is different
1341                         // if(likely(next_ut + st->update_every * 1000000ULL > now_ut)) rd->calculated_value = new_value;
1342                     }
1343                     break;
1344             }
1345
1346             if(unlikely(!store_this_entry)) {
1347                 // store_this_entry = 1;
1348                 continue;
1349             }
1350
1351             if(likely(rd->updated && rd->counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
1352                 rd->values[st->current_entry] = pack_storage_number(new_value, storage_flags );
1353                 rd->last_stored_value = new_value;
1354
1355                 if(unlikely(st->debug))
1356                     debug(D_RRD_STATS, "%s/%s: STORE[%ld] "
1357                         CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT
1358                         , st->id, rd->name
1359                         , st->current_entry
1360                         , unpack_storage_number(rd->values[st->current_entry]), new_value
1361                         );
1362             }
1363             else {
1364                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: STORE[%ld] = NON EXISTING "
1365                         , st->id, rd->name
1366                         , st->current_entry
1367                         );
1368                 rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
1369                 rd->last_stored_value = NAN;
1370             }
1371
1372             stored_entries++;
1373
1374             if(unlikely(st->debug)) {
1375                 calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor;
1376                 calculated_number t2 = unpack_storage_number(rd->values[st->current_entry]);
1377                 calculated_number accuracy = accuracy_loss(t1, t2);
1378                 debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " FLAGS=0x%08x (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)"
1379                         , st->id, rd->name
1380                         , st->current_entry
1381                         , t2
1382                         , get_storage_number_flags(rd->values[st->current_entry])
1383                         , t1
1384                         , accuracy
1385                         , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1386                         );
1387
1388                 rd->collected_volume += t1;
1389                 rd->stored_volume += t2;
1390                 accuracy = accuracy_loss(rd->collected_volume, rd->stored_volume);
1391                 debug(D_RRD_STATS, "%s/%s: VOLUME[%ld] = " CALCULATED_NUMBER_FORMAT ", calculated  = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s"
1392                         , st->id, rd->name
1393                         , st->current_entry
1394                         , rd->stored_volume
1395                         , rd->collected_volume
1396                         , accuracy
1397                         , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1398                         );
1399
1400             }
1401         }
1402         // reset the storage flags for the next point, if any;
1403         storage_flags = SN_EXISTS;
1404
1405         st->counter++;
1406         st->current_entry = ((st->current_entry + 1) >= st->entries) ? 0 : st->current_entry + 1;
1407         last_ut = next_ut;
1408     }
1409
1410     // align next interpolation to last collection point
1411     if(likely(stored_entries || !store_this_entry)) {
1412         st->last_updated.tv_sec = st->last_collected_time.tv_sec;
1413         st->last_updated.tv_usec = st->last_collected_time.tv_usec;
1414         st->last_collected_total  = st->collected_total;
1415     }
1416
1417     for( rd = st->dimensions; likely(rd) ; rd = rd->next ) {
1418         if(unlikely(!rd->updated)) continue;
1419
1420         if(likely(stored_entries || !store_this_entry)) {
1421             if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_collected_value (old: " COLLECTED_NUMBER_FORMAT ") to last_collected_value (new: " COLLECTED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_collected_value, rd->collected_value);
1422             rd->last_collected_value = rd->collected_value;
1423
1424             if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value, rd->calculated_value);
1425             rd->last_calculated_value = rd->calculated_value;
1426         }
1427
1428         rd->calculated_value = 0;
1429         rd->collected_value = 0;
1430         rd->updated = 0;
1431
1432         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: END "
1433             " last_collected_value = " COLLECTED_NUMBER_FORMAT
1434             " collected_value = " COLLECTED_NUMBER_FORMAT
1435             " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1436             " calculated_value = " CALCULATED_NUMBER_FORMAT
1437             , st->id, rd->name
1438             , rd->last_collected_value
1439             , rd->collected_value
1440             , rd->last_calculated_value
1441             , rd->calculated_value
1442             );
1443     }
1444
1445     // ALL DONE ABOUT THE DATA UPDATE
1446     // --------------------------------------------------------------------
1447
1448     // find if there are any obsolete dimensions (not updated recently)
1449     if(unlikely(rrd_delete_unupdated_dimensions)) {
1450
1451         for( rd = st->dimensions; likely(rd) ; rd = rd->next )
1452             if((rd->last_collected_time.tv_sec + (rrd_delete_unupdated_dimensions * st->update_every)) < st->last_collected_time.tv_sec)
1453                 break;
1454
1455         if(unlikely(rd)) {
1456             // there is dimension to free
1457             // upgrade our read lock to a write lock
1458             pthread_rwlock_unlock(&st->rwlock);
1459             pthread_rwlock_wrlock(&st->rwlock);
1460
1461             for( rd = st->dimensions, last = NULL ; likely(rd) ; ) {
1462                 // remove it only it is not updated in rrd_delete_unupdated_dimensions seconds
1463
1464                 if(unlikely((rd->last_collected_time.tv_sec + (rrd_delete_unupdated_dimensions * st->update_every)) < st->last_collected_time.tv_sec)) {
1465                     info("Removing obsolete dimension '%s' (%s) of '%s' (%s).", rd->name, rd->id, st->name, st->id);
1466
1467                     if(unlikely(!last)) {
1468                         st->dimensions = rd->next;
1469                         rd->next = NULL;
1470                         rrddim_free(st, rd);
1471                         rd = st->dimensions;
1472                         continue;
1473                     }
1474                     else {
1475                         last->next = rd->next;
1476                         rd->next = NULL;
1477                         rrddim_free(st, rd);
1478                         rd = last->next;
1479                         continue;
1480                     }
1481                 }
1482
1483                 last = rd;
1484                 rd = rd->next;
1485             }
1486
1487             if(unlikely(!st->dimensions)) {
1488                 info("Disabling chart %s (%s) since it does not have any dimensions", st->name, st->id);
1489                 st->enabled = 0;
1490             }
1491         }
1492     }
1493
1494     pthread_rwlock_unlock(&st->rwlock);
1495
1496     if(unlikely(pthread_setcancelstate(oldstate, NULL) != 0))
1497         error("Cannot set pthread cancel state to RESTORE (%d).", oldstate);
1498
1499     return(st->usec_since_last_update);
1500 }