]> arthur.barton.de Git - netdata.git/blob - src/rrd.c
preparation for multi-host internal database
[netdata.git] / src / rrd.c
1 #ifdef HAVE_CONFIG_H
2 #include <config.h>
3 #endif
4 #include <stddef.h>
5 #include <string.h>
6 #include <unistd.h>
7 #include <time.h>
8 #include <sys/time.h>
9 #include <sys/mman.h>
10 #include <pthread.h>
11 #include <errno.h>
12 #include <ctype.h>
13 #include <sys/stat.h>
14 #include <sys/types.h>
15 #include <stdlib.h>
16
17 #include "common.h"
18 #include "log.h"
19 #include "appconfig.h"
20
21 #include "main.h"
22 #include "rrd.h"
23
24 #define RRD_DEFAULT_GAP_INTERPOLATIONS 1
25
26 // ----------------------------------------------------------------------------
27 // globals
28
29 // if not zero it gives the time (in seconds) to remove un-updated dimensions
30 // DO NOT ENABLE
31 // if dimensions are removed, the chart generation will have to run again
32 int rrd_delete_unupdated_dimensions = 0;
33
34 int rrd_update_every = UPDATE_EVERY;
35 int rrd_default_history_entries = RRD_DEFAULT_HISTORY_ENTRIES;
36 int rrd_memory_mode = RRD_MEMORY_MODE_SAVE;
37
38 static int rrdset_compare(void* a, void* b);
39 static int rrdset_compare_name(void* a, void* b);
40
41 RRDHOST localhost = {
42                 .hostname = "localhost",
43                 .rrdset_root = NULL,
44                 .rrdset_root_rwlock = PTHREAD_RWLOCK_INITIALIZER,
45         .rrdset_root_index = {
46             { NULL, rrdset_compare },
47             AVL_LOCK_INITIALIZER
48         },
49         .rrdset_root_index_name = {
50             { NULL, rrdset_compare_name },
51             AVL_LOCK_INITIALIZER
52         }
53 };
54
55 // ----------------------------------------------------------------------------
56 // RRDSET index
57
58 static int rrdset_compare(void* a, void* b) {
59         if(((RRDSET *)a)->hash < ((RRDSET *)b)->hash) return -1;
60         else if(((RRDSET *)a)->hash > ((RRDSET *)b)->hash) return 1;
61         else return strcmp(((RRDSET *)a)->id, ((RRDSET *)b)->id);
62 }
63
64 #define rrdset_index_add(host, st) avl_insert_lock(&((host)->rrdset_root_index), (avl *)(st))
65 #define rrdset_index_del(host, st) avl_remove_lock(&((host)->rrdset_root_index), (avl *)(st))
66
67 static RRDSET *rrdset_index_find(RRDHOST *host, const char *id, uint32_t hash) {
68         RRDSET tmp;
69         strncpyz(tmp.id, id, RRD_ID_LENGTH_MAX);
70         tmp.hash = (hash)?hash:simple_hash(tmp.id);
71
72         return (RRDSET *)avl_search_lock(&(host->rrdset_root_index), (avl *) &tmp);
73 }
74
75 // ----------------------------------------------------------------------------
76 // RRDSET name index
77
78 #define rrdset_from_avlname(avlname_ptr) ((RRDSET *)((avlname_ptr) - offsetof(RRDSET, avlname)))
79
80 static int rrdset_compare_name(void* a, void* b) {
81         RRDSET *A = rrdset_from_avlname(a);
82         RRDSET *B = rrdset_from_avlname(b);
83
84         // fprintf(stderr, "COMPARING: %s with %s\n", A->name, B->name);
85
86         if(A->hash_name < B->hash_name) return -1;
87         else if(A->hash_name > B->hash_name) return 1;
88         else return strcmp(A->name, B->name);
89 }
90
91 RRDSET *rrdset_index_add_name(RRDHOST *host, RRDSET *st) {
92         // fprintf(stderr, "ADDING: %s (name: %s)\n", st->id, st->name);
93         return (RRDSET *)avl_insert_lock(&host->rrdset_root_index_name, (avl *) (&st->avlname));
94 }
95
96 #define rrdset_index_del_name(host, st) avl_remove_lock(&((host)->rrdset_root_index_name), (avl *)(&st->avlname))
97
98 static RRDSET *rrdset_index_find_name(RRDHOST *host, const char *name, uint32_t hash) {
99         void *result = NULL;
100         RRDSET tmp;
101         tmp.name = name;
102         tmp.hash_name = (hash)?hash:simple_hash(tmp.name);
103
104         // fprintf(stderr, "SEARCHING: %s\n", name);
105         result = avl_search_lock(&host->rrdset_root_index_name, (avl *) (&(tmp.avlname)));
106         if(result) {
107                 RRDSET *st = rrdset_from_avlname(result);
108                 if(strcmp(st->magic, RRDSET_MAGIC))
109                         error("Search for RRDSET %s returned an invalid RRDSET %s (name %s)", name, st->id, st->name);
110
111                 // fprintf(stderr, "FOUND: %s\n", name);
112                 return rrdset_from_avlname(result);
113         }
114         // fprintf(stderr, "NOT FOUND: %s\n", name);
115         return NULL;
116 }
117
118
119 // ----------------------------------------------------------------------------
120 // RRDDIM index
121
122 static int rrddim_compare(void* a, void* b) {
123         if(((RRDDIM *)a)->hash < ((RRDDIM *)b)->hash) return -1;
124         else if(((RRDDIM *)a)->hash > ((RRDDIM *)b)->hash) return 1;
125         else return strcmp(((RRDDIM *)a)->id, ((RRDDIM *)b)->id);
126 }
127
128 #define rrddim_index_add(st, rd) avl_insert_lock(&((st)->dimensions_index), (avl *)(rd))
129 #define rrddim_index_del(st,rd ) avl_remove_lock(&((st)->dimensions_index), (avl *)(rd))
130
131 static RRDDIM *rrddim_index_find(RRDSET *st, const char *id, uint32_t hash) {
132         RRDDIM tmp;
133         strncpyz(tmp.id, id, RRD_ID_LENGTH_MAX);
134         tmp.hash = (hash)?hash:simple_hash(tmp.id);
135
136         return (RRDDIM *)avl_search_lock(&(st->dimensions_index), (avl *) &tmp);
137 }
138
139 // ----------------------------------------------------------------------------
140 // chart types
141
142 int rrdset_type_id(const char *name)
143 {
144         if(unlikely(strcmp(name, RRDSET_TYPE_AREA_NAME) == 0)) return RRDSET_TYPE_AREA;
145         else if(unlikely(strcmp(name, RRDSET_TYPE_STACKED_NAME) == 0)) return RRDSET_TYPE_STACKED;
146         else if(unlikely(strcmp(name, RRDSET_TYPE_LINE_NAME) == 0)) return RRDSET_TYPE_LINE;
147         return RRDSET_TYPE_LINE;
148 }
149
150 const char *rrdset_type_name(int chart_type)
151 {
152         static char line[] = RRDSET_TYPE_LINE_NAME;
153         static char area[] = RRDSET_TYPE_AREA_NAME;
154         static char stacked[] = RRDSET_TYPE_STACKED_NAME;
155
156         switch(chart_type) {
157                 case RRDSET_TYPE_LINE:
158                         return line;
159
160                 case RRDSET_TYPE_AREA:
161                         return area;
162
163                 case RRDSET_TYPE_STACKED:
164                         return stacked;
165         }
166         return line;
167 }
168
169 // ----------------------------------------------------------------------------
170 // load / save
171
172 const char *rrd_memory_mode_name(int id)
173 {
174         static const char ram[] = RRD_MEMORY_MODE_RAM_NAME;
175         static const char map[] = RRD_MEMORY_MODE_MAP_NAME;
176         static const char save[] = RRD_MEMORY_MODE_SAVE_NAME;
177
178         switch(id) {
179                 case RRD_MEMORY_MODE_RAM:
180                         return ram;
181
182                 case RRD_MEMORY_MODE_MAP:
183                         return map;
184
185                 case RRD_MEMORY_MODE_SAVE:
186                 default:
187                         return save;
188         }
189
190         return save;
191 }
192
193 int rrd_memory_mode_id(const char *name)
194 {
195         if(unlikely(!strcmp(name, RRD_MEMORY_MODE_RAM_NAME)))
196                 return RRD_MEMORY_MODE_RAM;
197         else if(unlikely(!strcmp(name, RRD_MEMORY_MODE_MAP_NAME)))
198                 return RRD_MEMORY_MODE_MAP;
199
200         return RRD_MEMORY_MODE_SAVE;
201 }
202
203 // ----------------------------------------------------------------------------
204 // algorithms types
205
206 int rrddim_algorithm_id(const char *name)
207 {
208         if(strcmp(name, RRDDIM_INCREMENTAL_NAME) == 0)                  return RRDDIM_INCREMENTAL;
209         if(strcmp(name, RRDDIM_ABSOLUTE_NAME) == 0)                     return RRDDIM_ABSOLUTE;
210         if(strcmp(name, RRDDIM_PCENT_OVER_ROW_TOTAL_NAME) == 0)                 return RRDDIM_PCENT_OVER_ROW_TOTAL;
211         if(strcmp(name, RRDDIM_PCENT_OVER_DIFF_TOTAL_NAME) == 0)        return RRDDIM_PCENT_OVER_DIFF_TOTAL;
212         return RRDDIM_ABSOLUTE;
213 }
214
215 const char *rrddim_algorithm_name(int chart_type)
216 {
217         static char absolute[] = RRDDIM_ABSOLUTE_NAME;
218         static char incremental[] = RRDDIM_INCREMENTAL_NAME;
219         static char percentage_of_absolute_row[] = RRDDIM_PCENT_OVER_ROW_TOTAL_NAME;
220         static char percentage_of_incremental_row[] = RRDDIM_PCENT_OVER_DIFF_TOTAL_NAME;
221
222         switch(chart_type) {
223                 case RRDDIM_ABSOLUTE:
224                         return absolute;
225
226                 case RRDDIM_INCREMENTAL:
227                         return incremental;
228
229                 case RRDDIM_PCENT_OVER_ROW_TOTAL:
230                         return percentage_of_absolute_row;
231
232                 case RRDDIM_PCENT_OVER_DIFF_TOTAL:
233                         return percentage_of_incremental_row;
234         }
235         return absolute;
236 }
237
238 // ----------------------------------------------------------------------------
239 // chart names
240
241 char *rrdset_strncpyz_name(char *to, const char *from, size_t length)
242 {
243         char c, *p = to;
244
245         while (length-- && (c = *from++)) {
246                 if(c != '.' && !isalnum(c))
247                         c = '_';
248
249                 *p++ = c;
250         }
251
252         *p = '\0';
253
254         return to;
255 }
256
257 void rrdset_set_name(RRDSET *st, const char *name)
258 {
259         debug(D_RRD_CALLS, "rrdset_set_name() old: %s, new: %s", st->name, name);
260
261         if(st->name) rrdset_index_del_name(&localhost, st);
262
263         char b[CONFIG_MAX_VALUE + 1];
264         char n[RRD_ID_LENGTH_MAX + 1];
265
266         snprintfz(n, RRD_ID_LENGTH_MAX, "%s.%s", st->type, name);
267         rrdset_strncpyz_name(b, n, CONFIG_MAX_VALUE);
268         st->name = config_get(st->id, "name", b);
269         st->hash_name = simple_hash(st->name);
270
271         rrdset_index_add_name(&localhost, st);
272 }
273
274 // ----------------------------------------------------------------------------
275 // cache directory
276
277 char *rrdset_cache_dir(const char *id)
278 {
279         char *ret = NULL;
280
281         static char *cache_dir = NULL;
282         if(!cache_dir) {
283                 cache_dir = config_get("global", "cache directory", CACHE_DIR);
284                 int r = mkdir(cache_dir, 0755);
285                 if(r != 0 && errno != EEXIST)
286                         error("Cannot create directory '%s'", cache_dir);
287         }
288
289         char b[FILENAME_MAX + 1];
290         char n[FILENAME_MAX + 1];
291         rrdset_strncpyz_name(b, id, FILENAME_MAX);
292
293         snprintfz(n, FILENAME_MAX, "%s/%s", cache_dir, b);
294         ret = config_get(id, "cache directory", n);
295
296         if(rrd_memory_mode == RRD_MEMORY_MODE_MAP || rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
297                 int r = mkdir(ret, 0775);
298                 if(r != 0 && errno != EEXIST)
299                         error("Cannot create directory '%s'", ret);
300         }
301
302         return ret;
303 }
304
305 // ----------------------------------------------------------------------------
306 // core functions
307
308 void rrdset_reset(RRDSET *st)
309 {
310         debug(D_RRD_CALLS, "rrdset_reset() %s", st->name);
311
312         st->last_collected_time.tv_sec = 0;
313         st->last_collected_time.tv_usec = 0;
314         st->last_updated.tv_sec = 0;
315         st->last_updated.tv_usec = 0;
316         st->current_entry = 0;
317         st->counter = 0;
318         st->counter_done = 0;
319
320         RRDDIM *rd;
321         for(rd = st->dimensions; rd ; rd = rd->next) {
322                 rd->last_collected_time.tv_sec = 0;
323                 rd->last_collected_time.tv_usec = 0;
324                 rd->counter = 0;
325                 bzero(rd->values, rd->entries * sizeof(storage_number));
326         }
327 }
328
329 RRDSET *rrdset_create(const char *type, const char *id, const char *name, const char *family, const char *context, const char *title, const char *units, long priority, int update_every, int chart_type)
330 {
331         if(!type || !type[0]) {
332                 fatal("Cannot create rrd stats without a type.");
333                 return NULL;
334         }
335
336         if(!id || !id[0]) {
337                 fatal("Cannot create rrd stats without an id.");
338                 return NULL;
339         }
340
341         char fullid[RRD_ID_LENGTH_MAX + 1];
342         char fullfilename[FILENAME_MAX + 1];
343         RRDSET *st = NULL;
344
345         snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
346
347         st = rrdset_find(fullid);
348         if(st) {
349                 error("Cannot create rrd stats for '%s', it already exists.", fullid);
350                 return st;
351         }
352
353         long entries = config_get_number(fullid, "history", rrd_default_history_entries);
354         if(entries < 5) entries = config_set_number(fullid, "history", 5);
355         if(entries > RRD_HISTORY_ENTRIES_MAX) entries = config_set_number(fullid, "history", RRD_HISTORY_ENTRIES_MAX);
356
357         int enabled = config_get_boolean(fullid, "enabled", 1);
358         if(!enabled) entries = 5;
359
360         unsigned long size = sizeof(RRDSET);
361         char *cache_dir = rrdset_cache_dir(fullid);
362
363         debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id);
364
365         snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir);
366         if(rrd_memory_mode != RRD_MEMORY_MODE_RAM) st = (RRDSET *)mymmap(fullfilename, size, ((rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 0);
367         if(st) {
368                 if(strcmp(st->magic, RRDSET_MAGIC) != 0) {
369                         errno = 0;
370                         info("Initializing file %s.", fullfilename);
371                         bzero(st, size);
372                 }
373                 else if(strcmp(st->id, fullid) != 0) {
374                         errno = 0;
375                         error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid);
376                         // munmap(st, size);
377                         // st = NULL;
378                         bzero(st, size);
379                 }
380                 else if(st->memsize != size || st->entries != entries) {
381                         errno = 0;
382                         error("File %s does not have the desired size. Clearing it.", fullfilename);
383                         bzero(st, size);
384                 }
385                 else if(st->update_every != update_every) {
386                         errno = 0;
387                         error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
388                         bzero(st, size);
389                 }
390                 else if((time(NULL) - st->last_updated.tv_sec) > update_every * entries) {
391                         errno = 0;
392                         error("File %s is too old. Clearing it.", fullfilename);
393                         bzero(st, size);
394                 }
395         }
396
397         if(st) {
398                 st->name = NULL;
399                 st->type = NULL;
400                 st->family = NULL;
401                 st->context = NULL;
402                 st->title = NULL;
403                 st->units = NULL;
404                 st->dimensions = NULL;
405                 st->next = NULL;
406                 st->mapped = rrd_memory_mode;
407         }
408         else {
409                 st = calloc(1, size);
410                 if(!st) {
411                         fatal("Cannot allocate memory for RRD_STATS %s.%s", type, id);
412                         return NULL;
413                 }
414                 st->mapped = RRD_MEMORY_MODE_RAM;
415         }
416         st->memsize = size;
417         st->entries = entries;
418         st->update_every = update_every;
419
420         if(st->current_entry >= st->entries) st->current_entry = 0;
421
422         strcpy(st->cache_filename, fullfilename);
423         strcpy(st->magic, RRDSET_MAGIC);
424
425         strcpy(st->id, fullid);
426         st->hash = simple_hash(st->id);
427
428         st->cache_dir = cache_dir;
429
430         st->chart_type = rrdset_type_id(config_get(st->id, "chart type", rrdset_type_name(chart_type)));
431         st->type       = config_get(st->id, "type", type);
432         st->family     = config_get(st->id, "family", family?family:st->type);
433         st->context    = config_get(st->id, "context", context?context:st->id);
434         st->units      = config_get(st->id, "units", units?units:"");
435
436         st->priority = config_get_number(st->id, "priority", priority);
437         st->enabled = enabled;
438
439         st->isdetail = 0;
440         st->debug = 0;
441
442         st->last_collected_time.tv_sec = 0;
443         st->last_collected_time.tv_usec = 0;
444         st->counter_done = 0;
445
446         st->gap_when_lost_iterations_above = (int) (
447                         config_get_number(st->id, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS) + 2);
448
449         avl_init_lock(&st->dimensions_index, rrddim_compare);
450
451         pthread_rwlock_init(&st->rwlock, NULL);
452         pthread_rwlock_wrlock(&localhost.rrdset_root_rwlock);
453
454         if(name && *name) rrdset_set_name(st, name);
455         else rrdset_set_name(st, id);
456
457         {
458                 char varvalue[CONFIG_MAX_VALUE + 1];
459                 snprintfz(varvalue, CONFIG_MAX_VALUE, "%s (%s)", title?title:"", st->name);
460                 st->title = config_get(st->id, "title", varvalue);
461         }
462
463         st->next = localhost.rrdset_root;
464     localhost.rrdset_root = st;
465
466         rrdset_index_add(&localhost, st);
467
468         pthread_rwlock_unlock(&localhost.rrdset_root_rwlock);
469
470         return(st);
471 }
472
473 RRDDIM *rrddim_add(RRDSET *st, const char *id, const char *name, long multiplier, long divisor, int algorithm)
474 {
475         char filename[FILENAME_MAX + 1];
476         char fullfilename[FILENAME_MAX + 1];
477
478         char varname[CONFIG_MAX_NAME + 1];
479         RRDDIM *rd = NULL;
480         unsigned long size = sizeof(RRDDIM) + (st->entries * sizeof(storage_number));
481
482         debug(D_RRD_CALLS, "Adding dimension '%s/%s'.", st->id, id);
483
484         rrdset_strncpyz_name(filename, id, FILENAME_MAX);
485         snprintfz(fullfilename, FILENAME_MAX, "%s/%s.db", st->cache_dir, filename);
486         if(rrd_memory_mode != RRD_MEMORY_MODE_RAM) rd = (RRDDIM *)mymmap(fullfilename, size, ((rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 1);
487         if(rd) {
488                 struct timeval now;
489                 gettimeofday(&now, NULL);
490
491                 if(strcmp(rd->magic, RRDDIMENSION_MAGIC) != 0) {
492                         errno = 0;
493                         info("Initializing file %s.", fullfilename);
494                         bzero(rd, size);
495                 }
496                 else if(rd->memsize != size) {
497                         errno = 0;
498                         error("File %s does not have the desired size. Clearing it.", fullfilename);
499                         bzero(rd, size);
500                 }
501                 else if(rd->multiplier != multiplier) {
502                         errno = 0;
503                         error("File %s does not have the same multiplier. Clearing it.", fullfilename);
504                         bzero(rd, size);
505                 }
506                 else if(rd->divisor != divisor) {
507                         errno = 0;
508                         error("File %s does not have the same divisor. Clearing it.", fullfilename);
509                         bzero(rd, size);
510                 }
511                 else if(rd->algorithm != algorithm) {
512                         errno = 0;
513                         error("File %s does not have the same algorithm. Clearing it.", fullfilename);
514                         bzero(rd, size);
515                 }
516                 else if(rd->update_every != st->update_every) {
517                         errno = 0;
518                         error("File %s does not have the same refresh frequency. Clearing it.", fullfilename);
519                         bzero(rd, size);
520                 }
521                 else if(usecdiff(&now, &rd->last_collected_time) > (rd->entries * rd->update_every * 1000000ULL)) {
522                         errno = 0;
523                         error("File %s is too old. Clearing it.", fullfilename);
524                         bzero(rd, size);
525                 }
526                 else if(strcmp(rd->id, id) != 0) {
527                         errno = 0;
528                         error("File %s contents are not for dimension %s. Clearing it.", fullfilename, id);
529                         // munmap(rd, size);
530                         // rd = NULL;
531                         bzero(rd, size);
532                 }
533         }
534
535         if(rd) {
536                 // we have a file mapped for rd
537                 rd->mapped = rrd_memory_mode;
538                 rd->flags = 0x00000000;
539                 rd->next = NULL;
540                 rd->name = NULL;
541         }
542         else {
543                 // if we didn't manage to get a mmap'd dimension, just create one
544
545                 rd = calloc(1, size);
546                 if(!rd) {
547                         fatal("Cannot allocate RRD_DIMENSION %s/%s.", st->id, id);
548                         return NULL;
549                 }
550
551                 rd->mapped = RRD_MEMORY_MODE_RAM;
552         }
553         rd->memsize = size;
554
555         strcpy(rd->magic, RRDDIMENSION_MAGIC);
556         strcpy(rd->cache_filename, fullfilename);
557         strncpyz(rd->id, id, RRD_ID_LENGTH_MAX);
558         rd->hash = simple_hash(rd->id);
559
560         snprintfz(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
561         rd->name = config_get(st->id, varname, (name && *name)?name:rd->id);
562
563         snprintfz(varname, CONFIG_MAX_NAME, "dim %s algorithm", rd->id);
564         rd->algorithm = rrddim_algorithm_id(config_get(st->id, varname, rrddim_algorithm_name(algorithm)));
565
566         snprintfz(varname, CONFIG_MAX_NAME, "dim %s multiplier", rd->id);
567         rd->multiplier = config_get_number(st->id, varname, multiplier);
568
569         snprintfz(varname, CONFIG_MAX_NAME, "dim %s divisor", rd->id);
570         rd->divisor = config_get_number(st->id, varname, divisor);
571         if(!rd->divisor) rd->divisor = 1;
572
573         rd->entries = st->entries;
574         rd->update_every = st->update_every;
575
576         // prevent incremental calculation spikes
577         rd->counter = 0;
578         rd->updated = 0;
579         rd->calculated_value = 0;
580         rd->last_calculated_value = 0;
581         rd->collected_value = 0;
582         rd->last_collected_value = 0;
583         rd->collected_volume = 0;
584         rd->stored_volume = 0;
585         rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
586         rd->last_collected_time.tv_sec = 0;
587         rd->last_collected_time.tv_usec = 0;
588
589         // append this dimension
590         pthread_rwlock_wrlock(&st->rwlock);
591         if(!st->dimensions)
592                 st->dimensions = rd;
593         else {
594                 RRDDIM *td = st->dimensions;
595                 for(; td->next; td = td->next) ;
596                 td->next = rd;
597         }
598         pthread_rwlock_unlock(&st->rwlock);
599
600         rrddim_index_add(st, rd);
601
602         return(rd);
603 }
604
605 void rrddim_set_name(RRDSET *st, RRDDIM *rd, const char *name)
606 {
607         debug(D_RRD_CALLS, "rrddim_set_name() %s.%s", st->name, rd->name);
608
609         char varname[CONFIG_MAX_NAME + 1];
610         snprintfz(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
611         config_set_default(st->id, varname, name);
612 }
613
614 void rrddim_free(RRDSET *st, RRDDIM *rd)
615 {
616         debug(D_RRD_CALLS, "rrddim_free() %s.%s", st->name, rd->name);
617
618         RRDDIM *i, *last = NULL;
619         for(i = st->dimensions; i && i != rd ; i = i->next) last = i;
620
621         if(!i) {
622                 error("Request to free dimension %s.%s but it is not linked.", st->id, rd->name);
623                 return;
624         }
625
626         if(last) last->next = rd->next;
627         else st->dimensions = rd->next;
628         rd->next = NULL;
629
630         rrddim_index_del(st, rd);
631
632         // free(rd->annotations);
633         if(rd->mapped == RRD_MEMORY_MODE_SAVE) {
634                 debug(D_RRD_CALLS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
635                 savememory(rd->cache_filename, rd, rd->memsize);
636
637                 debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
638                 munmap(rd, rd->memsize);
639         }
640         else if(rd->mapped == RRD_MEMORY_MODE_MAP) {
641                 debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
642                 munmap(rd, rd->memsize);
643         }
644         else {
645                 debug(D_RRD_CALLS, "Removing dimension '%s'.", rd->name);
646                 free(rd);
647         }
648 }
649
650 void rrdset_free_all(void)
651 {
652         info("Freeing all memory...");
653
654         RRDSET *st;
655         for(st = localhost.rrdset_root; st ;) {
656                 RRDSET *next = st->next;
657
658                 while(st->dimensions)
659                         rrddim_free(st, st->dimensions);
660
661                 rrdset_index_del(&localhost, st);
662
663                 if(st->mapped == RRD_MEMORY_MODE_SAVE) {
664                         debug(D_RRD_CALLS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
665                         savememory(st->cache_filename, st, st->memsize);
666
667                         debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
668                         munmap(st, st->memsize);
669                 }
670                 else if(st->mapped == RRD_MEMORY_MODE_MAP) {
671                         debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
672                         munmap(st, st->memsize);
673                 }
674                 else
675                         free(st);
676
677                 st = next;
678         }
679     localhost.rrdset_root = NULL;
680
681         info("Memory cleanup completed...");
682 }
683
684 void rrdset_save_all(void) {
685         info("Saving database...");
686
687         RRDSET *st;
688         RRDDIM *rd;
689
690         pthread_rwlock_wrlock(&localhost.rrdset_root_rwlock);
691         for(st = localhost.rrdset_root; st ; st = st->next) {
692                 pthread_rwlock_wrlock(&st->rwlock);
693
694                 if(st->mapped == RRD_MEMORY_MODE_SAVE) {
695                         debug(D_RRD_CALLS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
696                         savememory(st->cache_filename, st, st->memsize);
697                 }
698
699                 for(rd = st->dimensions; rd ; rd = rd->next) {
700                         if(likely(rd->mapped == RRD_MEMORY_MODE_SAVE)) {
701                                 debug(D_RRD_CALLS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
702                                 savememory(rd->cache_filename, rd, rd->memsize);
703                         }
704                 }
705
706                 pthread_rwlock_unlock(&st->rwlock);
707         }
708         pthread_rwlock_unlock(&localhost.rrdset_root_rwlock);
709 }
710
711
712 RRDSET *rrdset_find(const char *id)
713 {
714         debug(D_RRD_CALLS, "rrdset_find() for chart %s", id);
715
716         RRDSET *st = rrdset_index_find(&localhost, id, 0);
717         return(st);
718 }
719
720 RRDSET *rrdset_find_bytype(const char *type, const char *id)
721 {
722         debug(D_RRD_CALLS, "rrdset_find_bytype() for chart %s.%s", type, id);
723
724         char buf[RRD_ID_LENGTH_MAX + 1];
725
726         strncpyz(buf, type, RRD_ID_LENGTH_MAX - 1);
727         strcat(buf, ".");
728         int len = (int) strlen(buf);
729         strncpyz(&buf[len], id, (size_t) (RRD_ID_LENGTH_MAX - len));
730
731         return(rrdset_find(buf));
732 }
733
734 RRDSET *rrdset_find_byname(const char *name)
735 {
736         debug(D_RRD_CALLS, "rrdset_find_byname() for chart %s", name);
737
738         RRDSET *st = rrdset_index_find_name(&localhost, name, 0);
739         return(st);
740 }
741
742 RRDDIM *rrddim_find(RRDSET *st, const char *id)
743 {
744         debug(D_RRD_CALLS, "rrddim_find() for chart %s, dimension %s", st->name, id);
745
746         return rrddim_index_find(st, id, 0);
747 }
748
749 int rrddim_hide(RRDSET *st, const char *id)
750 {
751         debug(D_RRD_CALLS, "rrddim_hide() for chart %s, dimension %s", st->name, id);
752
753         RRDDIM *rd = rrddim_find(st, id);
754         if(unlikely(!rd)) {
755                 error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
756                 return 1;
757         }
758
759         rd->flags |= RRDDIM_FLAG_HIDDEN;
760         return 0;
761 }
762
763 int rrddim_unhide(RRDSET *st, const char *id)
764 {
765         debug(D_RRD_CALLS, "rrddim_unhide() for chart %s, dimension %s", st->name, id);
766
767         RRDDIM *rd = rrddim_find(st, id);
768         if(unlikely(!rd)) {
769                 error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
770                 return 1;
771         }
772
773         if(rd->flags & RRDDIM_FLAG_HIDDEN) rd->flags ^= RRDDIM_FLAG_HIDDEN;
774         return 0;
775 }
776
777 collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value)
778 {
779         debug(D_RRD_CALLS, "rrddim_set_by_pointer() for chart %s, dimension %s, value " COLLECTED_NUMBER_FORMAT, st->name, rd->name, value);
780
781         gettimeofday(&rd->last_collected_time, NULL);
782         rd->collected_value = value;
783         rd->updated = 1;
784         rd->counter++;
785
786         return rd->last_collected_value;
787 }
788
789 collected_number rrddim_set(RRDSET *st, const char *id, collected_number value)
790 {
791         RRDDIM *rd = rrddim_find(st, id);
792         if(unlikely(!rd)) {
793                 error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
794                 return 0;
795         }
796
797         return rrddim_set_by_pointer(st, rd, value);
798 }
799
800 void rrdset_next_usec(RRDSET *st, unsigned long long microseconds)
801 {
802         if(!microseconds) rrdset_next(st);
803         else {
804                 debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds);
805
806                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: NEXT: %llu microseconds", st->name, microseconds);
807                 st->usec_since_last_update = microseconds;
808         }
809 }
810
811 void rrdset_next(RRDSET *st)
812 {
813         unsigned long long microseconds = 0;
814
815         if(likely(st->last_collected_time.tv_sec)) {
816                 struct timeval now;
817                 gettimeofday(&now, NULL);
818                 microseconds = usecdiff(&now, &st->last_collected_time);
819         }
820         // prevent infinite loop
821         else microseconds = st->update_every * 1000000ULL;
822
823         rrdset_next_usec(st, microseconds);
824 }
825
826 void rrdset_next_plugins(RRDSET *st)
827 {
828         rrdset_next(st);
829 }
830
831 unsigned long long rrdset_done(RRDSET *st)
832 {
833         if(unlikely(netdata_exit)) return 0;
834
835         debug(D_RRD_CALLS, "rrdset_done() for chart %s", st->name);
836
837         RRDDIM *rd, *last;
838         int oldstate, store_this_entry = 1, first_entry = 0;
839         unsigned long long last_ut, now_ut, next_ut, stored_entries = 0;
840
841         if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) != 0))
842                 error("Cannot set pthread cancel state to DISABLE.");
843
844         // a read lock is OK here
845         pthread_rwlock_rdlock(&st->rwlock);
846
847         // enable the chart, if it was disabled
848         if(unlikely(rrd_delete_unupdated_dimensions) && !st->enabled)
849                 st->enabled = 1;
850
851         // check if the chart has a long time to be updated
852         if(unlikely(st->usec_since_last_update > st->entries * st->update_every * 1000000ULL)) {
853                 info("%s: took too long to be updated (%0.3Lf secs). Reseting it.", st->name, (long double)(st->usec_since_last_update / 1000000.0));
854                 rrdset_reset(st);
855                 st->usec_since_last_update = st->update_every * 1000000ULL;
856                 first_entry = 1;
857         }
858         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: microseconds since last update: %llu", st->name, st->usec_since_last_update);
859
860         // set last_collected_time
861         if(unlikely(!st->last_collected_time.tv_sec)) {
862                 // it is the first entry
863                 // set the last_collected_time to now
864                 gettimeofday(&st->last_collected_time, NULL);
865
866                 // the first entry should not be stored
867                 store_this_entry = 0;
868                 first_entry = 1;
869
870                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: has not set last_collected_time. Setting it now. Will not store the next entry.", st->name);
871         }
872         else {
873                 // it is not the first entry
874                 // calculate the proper last_collected_time, using usec_since_last_update
875                 unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec + st->usec_since_last_update;
876                 st->last_collected_time.tv_sec = (time_t) (ut / 1000000ULL);
877                 st->last_collected_time.tv_usec = (suseconds_t) (ut % 1000000ULL);
878         }
879
880         // if this set has not been updated in the past
881         // we fake the last_update time to be = now - usec_since_last_update
882         if(unlikely(!st->last_updated.tv_sec)) {
883                 // it has never been updated before
884                 // set a fake last_updated, in the past using usec_since_last_update
885                 unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - st->usec_since_last_update;
886                 st->last_updated.tv_sec = (time_t) (ut / 1000000ULL);
887                 st->last_updated.tv_usec = (suseconds_t) (ut % 1000000ULL);
888
889                 // the first entry should not be stored
890                 store_this_entry = 0;
891                 first_entry = 1;
892
893                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
894         }
895
896         // check if we will re-write the entire data set
897         if(unlikely(usecdiff(&st->last_collected_time, &st->last_updated) > st->update_every * st->entries * 1000000ULL)) {
898                 info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Reseting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
899                 rrdset_reset(st);
900
901                 st->usec_since_last_update = st->update_every * 1000000ULL;
902
903                 gettimeofday(&st->last_collected_time, NULL);
904
905                 unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - st->usec_since_last_update;
906                 st->last_updated.tv_sec = (time_t) (ut / 1000000ULL);
907                 st->last_updated.tv_usec = (suseconds_t) (ut % 1000000ULL);
908
909                 // the first entry should not be stored
910                 store_this_entry = 0;
911                 first_entry = 1;
912         }
913
914         // these are the 3 variables that will help us in interpolation
915         // last_ut = the last time we added a value to the storage
916         //  now_ut = the time the current value is taken at
917         // next_ut = the time of the next interpolation point
918         last_ut = st->last_updated.tv_sec * 1000000ULL + st->last_updated.tv_usec;
919         now_ut  = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec;
920         next_ut = (st->last_updated.tv_sec + st->update_every) * 1000000ULL;
921
922         if(unlikely(!first_entry && now_ut < next_ut)) {
923                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name);
924         }
925
926         if(unlikely(st->debug)) {
927                 debug(D_RRD_STATS, "%s: last ut = %0.3Lf (last updated time)", st->name, (long double)last_ut/1000000.0);
928                 debug(D_RRD_STATS, "%s: now  ut = %0.3Lf (current update time)", st->name, (long double)now_ut/1000000.0);
929                 debug(D_RRD_STATS, "%s: next ut = %0.3Lf (next interpolation point)", st->name, (long double)next_ut/1000000.0);
930         }
931
932         if(unlikely(!st->counter_done)) {
933                 store_this_entry = 0;
934                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: Will not store the next entry.", st->name);
935         }
936         st->counter_done++;
937
938         // calculate totals and count the dimensions
939         int dimensions;
940         st->collected_total = 0;
941         for( rd = st->dimensions, dimensions = 0 ; likely(rd) ; rd = rd->next, dimensions++ )
942                 if(likely(rd->updated)) st->collected_total += rd->collected_value;
943
944         uint32_t storage_flags = SN_EXISTS;
945
946         // process all dimensions to calculate their values
947         // based on the collected figures only
948         // at this stage we do not interpolate anything
949         for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
950
951                 if(unlikely(!rd->updated)) {
952                         rd->calculated_value = 0;
953                         continue;
954                 }
955
956                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: START "
957                         " last_collected_value = " COLLECTED_NUMBER_FORMAT
958                         " collected_value = " COLLECTED_NUMBER_FORMAT
959                         " last_calculated_value = " CALCULATED_NUMBER_FORMAT
960                         " calculated_value = " CALCULATED_NUMBER_FORMAT
961                         , st->id, rd->name
962                         , rd->last_collected_value
963                         , rd->collected_value
964                         , rd->last_calculated_value
965                         , rd->calculated_value
966                         );
967
968                 switch(rd->algorithm) {
969                         case RRDDIM_ABSOLUTE:
970                                 rd->calculated_value = (calculated_number)rd->collected_value
971                                         * (calculated_number)rd->multiplier
972                                         / (calculated_number)rd->divisor;
973
974                                 if(unlikely(st->debug))
975                                         debug(D_RRD_STATS, "%s/%s: CALC ABS/ABS-NO-IN "
976                                                 CALCULATED_NUMBER_FORMAT " = "
977                                                 COLLECTED_NUMBER_FORMAT
978                                                 " * " CALCULATED_NUMBER_FORMAT
979                                                 " / " CALCULATED_NUMBER_FORMAT
980                                                 , st->id, rd->name
981                                                 , rd->calculated_value
982                                                 , rd->collected_value
983                                                 , (calculated_number)rd->multiplier
984                                                 , (calculated_number)rd->divisor
985                                                 );
986                                 break;
987
988                         case RRDDIM_PCENT_OVER_ROW_TOTAL:
989                                 if(unlikely(!st->collected_total))
990                                         rd->calculated_value = 0;
991                                 else
992                                         // the percentage of the current value
993                                         // over the total of all dimensions
994                                         rd->calculated_value =
995                                               (calculated_number)100
996                                             * (calculated_number)rd->collected_value
997                                             / (calculated_number)st->collected_total;
998
999                                 if(unlikely(st->debug))
1000                                         debug(D_RRD_STATS, "%s/%s: CALC PCENT-ROW "
1001                                                 CALCULATED_NUMBER_FORMAT " = 100"
1002                                                 " * " COLLECTED_NUMBER_FORMAT
1003                                                 " / " COLLECTED_NUMBER_FORMAT
1004                                                 , st->id, rd->name
1005                                                 , rd->calculated_value
1006                                                 , rd->collected_value
1007                                                 , st->collected_total
1008                                                 );
1009                                 break;
1010
1011                         case RRDDIM_INCREMENTAL:
1012                                 if(unlikely(rd->counter <= 1)) {
1013                                         rd->calculated_value = 0;
1014                                         continue;
1015                                 }
1016
1017                                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
1018                                 // to reset the calculation (it will give zero as the calculation for this second)
1019                                 if(unlikely(rd->last_collected_value > rd->collected_value)) {
1020                                         debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
1021                                                         , st->name, rd->name
1022                                                         , rd->last_collected_value
1023                                                         , rd->collected_value);
1024                                         if(!(rd->flags & RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)) storage_flags = SN_EXISTS_RESET;
1025                                         rd->last_collected_value = rd->collected_value;
1026                                 }
1027
1028                                 rd->calculated_value =
1029                                       (calculated_number)(rd->collected_value - rd->last_collected_value)
1030                                     * (calculated_number)rd->multiplier
1031                                     / (calculated_number)rd->divisor;
1032
1033                                 if(unlikely(st->debug))
1034                                         debug(D_RRD_STATS, "%s/%s: CALC INC PRE "
1035                                                 CALCULATED_NUMBER_FORMAT " = ("
1036                                                 COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT
1037                                                 ")"
1038                                                 " * " CALCULATED_NUMBER_FORMAT
1039                                                 " / " CALCULATED_NUMBER_FORMAT
1040                                                 , st->id, rd->name
1041                                                 , rd->calculated_value
1042                                                 , rd->collected_value, rd->last_collected_value
1043                                                 , (calculated_number)rd->multiplier
1044                                                 , (calculated_number)rd->divisor
1045                                                 );
1046                                 break;
1047
1048                         case RRDDIM_PCENT_OVER_DIFF_TOTAL:
1049                                 if(unlikely(rd->counter <= 1)) {
1050                                         rd->calculated_value = 0;
1051                                         continue;
1052                                 }
1053
1054                                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
1055                                 // to reset the calculation (it will give zero as the calculation for this second)
1056                                 if(unlikely(rd->last_collected_value > rd->collected_value)) {
1057                                         debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
1058                                         , st->name, rd->name
1059                                         , rd->last_collected_value
1060                                         , rd->collected_value);
1061                                         if(!(rd->flags & RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)) storage_flags = SN_EXISTS_RESET;
1062                                         rd->last_collected_value = rd->collected_value;
1063                                 }
1064
1065                                 // the percentage of the current increment
1066                                 // over the increment of all dimensions together
1067                                 if(unlikely(st->collected_total == st->last_collected_total))
1068                                         rd->calculated_value = 0;
1069                                 else
1070                                         rd->calculated_value =
1071                                                   (calculated_number)100
1072                                                 * (calculated_number)(rd->collected_value - rd->last_collected_value)
1073                                                 / (calculated_number)(st->collected_total - st->last_collected_total);
1074
1075                                 if(unlikely(st->debug))
1076                                         debug(D_RRD_STATS, "%s/%s: CALC PCENT-DIFF "
1077                                                 CALCULATED_NUMBER_FORMAT " = 100"
1078                                                 " * (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
1079                                                 " / (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
1080                                                 , st->id, rd->name
1081                                                 , rd->calculated_value
1082                                                 , rd->collected_value, rd->last_collected_value
1083                                                 , st->collected_total, st->last_collected_total
1084                                                 );
1085                                 break;
1086
1087                         default:
1088                                 // make the default zero, to make sure
1089                                 // it gets noticed when we add new types
1090                                 rd->calculated_value = 0;
1091
1092                                 if(unlikely(st->debug))
1093                                         debug(D_RRD_STATS, "%s/%s: CALC "
1094                                                 CALCULATED_NUMBER_FORMAT " = 0"
1095                                                 , st->id, rd->name
1096                                                 , rd->calculated_value
1097                                                 );
1098                                 break;
1099                 }
1100
1101                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: PHASE2 "
1102                         " last_collected_value = " COLLECTED_NUMBER_FORMAT
1103                         " collected_value = " COLLECTED_NUMBER_FORMAT
1104                         " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1105                         " calculated_value = " CALCULATED_NUMBER_FORMAT
1106                         , st->id, rd->name
1107                         , rd->last_collected_value
1108                         , rd->collected_value
1109                         , rd->last_calculated_value
1110                         , rd->calculated_value
1111                         );
1112
1113         }
1114
1115         // at this point we have all the calculated values ready
1116         // it is now time to interpolate values on a second boundary
1117
1118         unsigned long long first_ut = last_ut;
1119         long long iterations = (now_ut - last_ut) / (st->update_every * 1000000ULL);
1120         if((now_ut % (st->update_every * 1000000ULL)) == 0) iterations++;
1121
1122         for( ; likely(next_ut <= now_ut) ; next_ut += st->update_every * 1000000ULL, iterations-- ) {
1123 #ifdef NETDATA_INTERNAL_CHECKS
1124                 if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_ut = %llu, next_ut = %llu, now_ut = %llu", st->name, first_ut, last_ut, next_ut, now_ut); }
1125 #endif
1126
1127                 if(unlikely(st->debug)) {
1128                         debug(D_RRD_STATS, "%s: last ut = %0.3Lf (last updated time)", st->name, (long double)last_ut/1000000.0);
1129                         debug(D_RRD_STATS, "%s: next ut = %0.3Lf (next interpolation point)", st->name, (long double)next_ut/1000000.0);
1130                 }
1131
1132                 st->last_updated.tv_sec = (time_t) (next_ut / 1000000ULL);
1133                 st->last_updated.tv_usec = 0;
1134
1135                 for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
1136                         calculated_number new_value;
1137
1138                         switch(rd->algorithm) {
1139                                 case RRDDIM_INCREMENTAL:
1140                                         new_value = (calculated_number)
1141                                                 (          rd->calculated_value
1142                                                         * (calculated_number)(next_ut - last_ut)
1143                                                         / (calculated_number)(now_ut - last_ut)
1144                                                 );
1145
1146                                         if(unlikely(st->debug))
1147                                                 debug(D_RRD_STATS, "%s/%s: CALC2 INC "
1148                                                         CALCULATED_NUMBER_FORMAT " = "
1149                                                         CALCULATED_NUMBER_FORMAT
1150                                                         " * %llu"
1151                                                         " / %llu"
1152                                                         , st->id, rd->name
1153                                                         , new_value
1154                                                         , rd->calculated_value
1155                                                         , (next_ut - last_ut)
1156                                                         , (now_ut - last_ut)
1157                                                         );
1158
1159                                         rd->calculated_value -= new_value;
1160                                         new_value += rd->last_calculated_value;
1161                                         rd->last_calculated_value = 0;
1162                                         new_value /= (calculated_number)st->update_every;
1163                                         break;
1164
1165                                 case RRDDIM_ABSOLUTE:
1166                                 case RRDDIM_PCENT_OVER_ROW_TOTAL:
1167                                 case RRDDIM_PCENT_OVER_DIFF_TOTAL:
1168                                 default:
1169                                         if(iterations == 1) {
1170                                                 // this is the last iteration
1171                                                 // do not interpolate
1172                                                 // just show the calculated value
1173
1174                                                 new_value = rd->calculated_value;
1175                                         }
1176                                         else {
1177                                                 // we have missed an update
1178                                                 // interpolate in the middle values
1179
1180                                                 new_value = (calculated_number)
1181                                                         (       (         (rd->calculated_value - rd->last_calculated_value)
1182                                                                         * (calculated_number)(next_ut - first_ut)
1183                                                                         / (calculated_number)(now_ut - first_ut)
1184                                                                 )
1185                                                                 +  rd->last_calculated_value
1186                                                         );
1187
1188                                                 if(unlikely(st->debug))
1189                                                         debug(D_RRD_STATS, "%s/%s: CALC2 DEF "
1190                                                                 CALCULATED_NUMBER_FORMAT " = ((("
1191                                                                 "(" CALCULATED_NUMBER_FORMAT " - " CALCULATED_NUMBER_FORMAT ")"
1192                                                                 " * %llu"
1193                                                                 " / %llu) + " CALCULATED_NUMBER_FORMAT
1194                                                                 , st->id, rd->name
1195                                                                 , new_value
1196                                                                 , rd->calculated_value, rd->last_calculated_value
1197                                                                 , (next_ut - first_ut)
1198                                                                 , (now_ut - first_ut), rd->last_calculated_value
1199                                                                 );
1200
1201                                                 // this is wrong
1202                                                 // it fades the value towards the target
1203                                                 // while we know the calculated value is different
1204                                                 // if(likely(next_ut + st->update_every * 1000000ULL > now_ut)) rd->calculated_value = new_value;
1205                                         }
1206                                         break;
1207                         }
1208
1209                         if(unlikely(!store_this_entry)) {
1210                                 store_this_entry = 1;
1211                                 continue;
1212                         }
1213
1214                         if(likely(rd->updated && rd->counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
1215                                 rd->values[st->current_entry] = pack_storage_number(new_value, storage_flags );
1216
1217                                 if(unlikely(st->debug))
1218                                         debug(D_RRD_STATS, "%s/%s: STORE[%ld] "
1219                                                 CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT
1220                                                 , st->id, rd->name
1221                                                 , st->current_entry
1222                                                 , unpack_storage_number(rd->values[st->current_entry]), new_value
1223                                                 );
1224                         }
1225                         else {
1226                                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: STORE[%ld] = NON EXISTING "
1227                                                 , st->id, rd->name
1228                                                 , st->current_entry
1229                                                 );
1230                                 rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
1231                         }
1232
1233                         stored_entries++;
1234
1235                         if(unlikely(st->debug)) {
1236                                 calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor;
1237                                 calculated_number t2 = unpack_storage_number(rd->values[st->current_entry]);
1238                                 calculated_number accuracy = accuracy_loss(t1, t2);
1239                                 debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " FLAGS=0x%08x (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)"
1240                                                 , st->id, rd->name
1241                                                 , st->current_entry
1242                                                 , t2
1243                                                 , get_storage_number_flags(rd->values[st->current_entry])
1244                                                 , t1
1245                                                 , accuracy
1246                                                 , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1247                                                 );
1248
1249                                 rd->collected_volume += t1;
1250                                 rd->stored_volume += t2;
1251                                 accuracy = accuracy_loss(rd->collected_volume, rd->stored_volume);
1252                                 debug(D_RRD_STATS, "%s/%s: VOLUME[%ld] = " CALCULATED_NUMBER_FORMAT ", calculated  = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s"
1253                                                 , st->id, rd->name
1254                                                 , st->current_entry
1255                                                 , rd->stored_volume
1256                                                 , rd->collected_volume
1257                                                 , accuracy
1258                                                 , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1259                                                 );
1260
1261                         }
1262                 }
1263                 // reset the storage flags for the next point, if any;
1264                 storage_flags = SN_EXISTS;
1265
1266                 st->counter++;
1267                 st->current_entry = ((st->current_entry + 1) >= st->entries) ? 0 : st->current_entry + 1;
1268                 last_ut = next_ut;
1269         }
1270
1271         // align next interpolation to last collection point
1272         if(likely(stored_entries || !store_this_entry)) {
1273                 st->last_updated.tv_sec = st->last_collected_time.tv_sec;
1274                 st->last_updated.tv_usec = st->last_collected_time.tv_usec;
1275                 st->last_collected_total  = st->collected_total;
1276         }
1277
1278         for( rd = st->dimensions; likely(rd) ; rd = rd->next ) {
1279                 if(unlikely(!rd->updated)) continue;
1280
1281                 if(likely(stored_entries || !store_this_entry)) {
1282                         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_collected_value (old: " COLLECTED_NUMBER_FORMAT ") to last_collected_value (new: " COLLECTED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_collected_value, rd->collected_value);
1283                         rd->last_collected_value = rd->collected_value;
1284
1285                         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value, rd->calculated_value);
1286                         rd->last_calculated_value = rd->calculated_value;
1287                 }
1288
1289                 rd->calculated_value = 0;
1290                 rd->collected_value = 0;
1291                 rd->updated = 0;
1292
1293                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: END "
1294                         " last_collected_value = " COLLECTED_NUMBER_FORMAT
1295                         " collected_value = " COLLECTED_NUMBER_FORMAT
1296                         " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1297                         " calculated_value = " CALCULATED_NUMBER_FORMAT
1298                         , st->id, rd->name
1299                         , rd->last_collected_value
1300                         , rd->collected_value
1301                         , rd->last_calculated_value
1302                         , rd->calculated_value
1303                         );
1304         }
1305
1306         // ALL DONE ABOUT THE DATA UPDATE
1307         // --------------------------------------------------------------------
1308
1309         // find if there are any obsolete dimensions (not updated recently)
1310         if(unlikely(rrd_delete_unupdated_dimensions)) {
1311
1312                 for( rd = st->dimensions; likely(rd) ; rd = rd->next )
1313                         if((rd->last_collected_time.tv_sec + (rrd_delete_unupdated_dimensions * st->update_every)) < st->last_collected_time.tv_sec)
1314                                 break;
1315
1316                 if(unlikely(rd)) {
1317                         // there is dimension to free
1318                         // upgrade our read lock to a write lock
1319                         pthread_rwlock_unlock(&st->rwlock);
1320                         pthread_rwlock_wrlock(&st->rwlock);
1321
1322                         for( rd = st->dimensions, last = NULL ; likely(rd) ; ) {
1323                                 // remove it only it is not updated in rrd_delete_unupdated_dimensions seconds
1324
1325                                 if(unlikely((rd->last_collected_time.tv_sec + (rrd_delete_unupdated_dimensions * st->update_every)) < st->last_collected_time.tv_sec)) {
1326                                         info("Removing obsolete dimension '%s' (%s) of '%s' (%s).", rd->name, rd->id, st->name, st->id);
1327
1328                                         if(unlikely(!last)) {
1329                                                 st->dimensions = rd->next;
1330                                                 rd->next = NULL;
1331                                                 rrddim_free(st, rd);
1332                                                 rd = st->dimensions;
1333                                                 continue;
1334                                         }
1335                                         else {
1336                                                 last->next = rd->next;
1337                                                 rd->next = NULL;
1338                                                 rrddim_free(st, rd);
1339                                                 rd = last->next;
1340                                                 continue;
1341                                         }
1342                                 }
1343
1344                                 last = rd;
1345                                 rd = rd->next;
1346                         }
1347
1348                         if(unlikely(!st->dimensions)) {
1349                                 info("Disabling chart %s (%s) since it does not have any dimensions", st->name, st->id);
1350                                 st->enabled = 0;
1351                         }
1352                 }
1353         }
1354
1355         pthread_rwlock_unlock(&st->rwlock);
1356
1357         if(unlikely(pthread_setcancelstate(oldstate, NULL) != 0))
1358                 error("Cannot set pthread cancel state to RESTORE (%d).", oldstate);
1359
1360         return(st->usec_since_last_update);
1361 }