]> arthur.barton.de Git - netdata.git/blob - src/rrd.c
added libavl for supporting balanced binary trees - this improves search performance...
[netdata.git] / src / rrd.c
1 #include <string.h>
2 #include <unistd.h>
3 #include <time.h>
4 #include <sys/time.h>
5 #include <sys/mman.h>
6 #include <pthread.h>
7 #include <errno.h>
8 #include <ctype.h>
9 #include <sys/stat.h>
10 #include <sys/types.h>
11 #include <stdlib.h>
12
13 #include "log.h"
14 #include "config.h"
15 #include "common.h"
16
17 #include "rrd.h"
18
19 // ----------------------------------------------------------------------------
20 // globals
21
22 int rrd_update_every = UPDATE_EVERY;
23 int rrd_default_history_entries = RRD_DEFAULT_HISTORY_ENTRIES;
24
25 RRDSET *rrdset_root = NULL;
26 pthread_rwlock_t rrdset_root_rwlock = PTHREAD_RWLOCK_INITIALIZER;
27
28 int rrd_memory_mode = RRD_MEMORY_MODE_SAVE;
29
30
31 // ----------------------------------------------------------------------------
32 // RRDSET index
33
34 static int rrdset_iterator(avl *a) { if(a) {}; return 0; }
35
36 static int rrdset_compare(void* a, void* b) {
37         if(((RRDSET *)a)->hash < ((RRDSET *)b)->hash) return -1;
38         else if(((RRDSET *)a)->hash > ((RRDSET *)b)->hash) return 1;
39         else return strcmp(((RRDSET *)a)->id, ((RRDSET *)b)->id);
40 }
41
42 avl_tree rrdset_root_index = {
43                 NULL,
44                 rrdset_compare
45 };
46
47 #define rrdset_index_add(st) avl_insert(&rrdset_root_index, (avl *)(st))
48 #define rrdset_index_del(st) avl_remove(&rrdset_root_index, (avl *)(st))
49
50 static RRDSET *rrdset_index_find(const char *id, unsigned long hash) {
51         RRDSET *result = NULL, tmp;
52         strncpy(tmp.id, id, RRD_ID_LENGTH_MAX);
53         tmp.id[RRD_ID_LENGTH_MAX] = '\0';
54         tmp.hash = (hash)?hash:simple_hash(tmp.id);
55
56         avl_search(&(rrdset_root_index), (avl *)&tmp, rrdset_iterator, (avl **)&result);
57         return result;
58 }
59
60 // ----------------------------------------------------------------------------
61 // RRDDIM index
62
63 static int rrddim_iterator(avl *a) { if(a) {}; return 0; }
64
65 static int rrddim_compare(void* a, void* b) {
66         if(((RRDDIM *)a)->hash < ((RRDDIM *)b)->hash) return -1;
67         else if(((RRDDIM *)a)->hash > ((RRDDIM *)b)->hash) return 1;
68         else return strcmp(((RRDDIM *)a)->id, ((RRDDIM *)b)->id);
69 }
70
71 #define rrddim_index_add(st, rd) avl_insert(&((st)->dimensions_index), (avl *)(rd))
72 #define rrddim_index_del(st,rd ) avl_remove(&((st)->dimensions_index), (avl *)(rd))
73
74 static RRDDIM *rrddim_index_find(RRDSET *st, const char *id, unsigned long hash) {
75         RRDDIM *result = NULL, tmp;
76         tmp.hash = (hash)?hash:simple_hash(id);
77         strncpy(tmp.id, id, RRD_ID_LENGTH_MAX);
78         tmp.id[RRD_ID_LENGTH_MAX] = '\0';
79         tmp.hash = (hash)?hash:simple_hash(tmp.id);
80
81         avl_search(&(st->dimensions_index), (avl *)&tmp, rrddim_iterator, (avl **)&result);
82         return result;
83 }
84
85 // ----------------------------------------------------------------------------
86 // chart types
87
88 int rrdset_type_id(const char *name)
89 {
90         if(strcmp(name, RRDSET_TYPE_AREA_NAME) == 0) return RRDSET_TYPE_AREA;
91         if(strcmp(name, RRDSET_TYPE_STACKED_NAME) == 0) return RRDSET_TYPE_STACKED;
92         if(strcmp(name, RRDSET_TYPE_LINE_NAME) == 0) return RRDSET_TYPE_LINE;
93         return RRDSET_TYPE_LINE;
94 }
95
96 const char *rrdset_type_name(int chart_type)
97 {
98         static char line[] = RRDSET_TYPE_LINE_NAME;
99         static char area[] = RRDSET_TYPE_AREA_NAME;
100         static char stacked[] = RRDSET_TYPE_STACKED_NAME;
101
102         switch(chart_type) {
103                 case RRDSET_TYPE_LINE:
104                         return line;
105
106                 case RRDSET_TYPE_AREA:
107                         return area;
108
109                 case RRDSET_TYPE_STACKED:
110                         return stacked;
111         }
112         return line;
113 }
114
115 // ----------------------------------------------------------------------------
116 // load / save
117
118 const char *rrd_memory_mode_name(int id)
119 {
120         static const char ram[] = RRD_MEMORY_MODE_RAM_NAME;
121         static const char map[] = RRD_MEMORY_MODE_MAP_NAME;
122         static const char save[] = RRD_MEMORY_MODE_SAVE_NAME;
123
124         switch(id) {
125                 case RRD_MEMORY_MODE_RAM:
126                         return ram;
127
128                 case RRD_MEMORY_MODE_MAP:
129                         return map;
130
131                 case RRD_MEMORY_MODE_SAVE:
132                 default:
133                         return save;
134         }
135
136         return save;
137 }
138
139 int rrd_memory_mode_id(const char *name)
140 {
141         if(!strcmp(name, RRD_MEMORY_MODE_RAM_NAME))
142                 return RRD_MEMORY_MODE_RAM;
143         else if(!strcmp(name, RRD_MEMORY_MODE_MAP_NAME))
144                 return RRD_MEMORY_MODE_MAP;
145
146         return RRD_MEMORY_MODE_SAVE;
147 }
148
149 // ----------------------------------------------------------------------------
150 // algorithms types
151
152 int rrddim_algorithm_id(const char *name)
153 {
154         if(strcmp(name, RRDDIM_ABSOLUTE_NAME) == 0)                     return RRDDIM_ABSOLUTE;
155         if(strcmp(name, RRDDIM_INCREMENTAL_NAME) == 0)                  return RRDDIM_INCREMENTAL;
156         if(strcmp(name, RRDDIM_PCENT_OVER_ROW_TOTAL_NAME) == 0)                 return RRDDIM_PCENT_OVER_ROW_TOTAL;
157         if(strcmp(name, RRDDIM_PCENT_OVER_DIFF_TOTAL_NAME) == 0)        return RRDDIM_PCENT_OVER_DIFF_TOTAL;
158         return RRDDIM_ABSOLUTE;
159 }
160
161 const char *rrddim_algorithm_name(int chart_type)
162 {
163         static char absolute[] = RRDDIM_ABSOLUTE_NAME;
164         static char incremental[] = RRDDIM_INCREMENTAL_NAME;
165         static char percentage_of_absolute_row[] = RRDDIM_PCENT_OVER_ROW_TOTAL_NAME;
166         static char percentage_of_incremental_row[] = RRDDIM_PCENT_OVER_DIFF_TOTAL_NAME;
167
168         switch(chart_type) {
169                 case RRDDIM_ABSOLUTE:
170                         return absolute;
171
172                 case RRDDIM_INCREMENTAL:
173                         return incremental;
174
175                 case RRDDIM_PCENT_OVER_ROW_TOTAL:
176                         return percentage_of_absolute_row;
177
178                 case RRDDIM_PCENT_OVER_DIFF_TOTAL:
179                         return percentage_of_incremental_row;
180         }
181         return absolute;
182 }
183
184 // ----------------------------------------------------------------------------
185 // chart names
186
187 char *rrdset_strncpy_name(char *to, const char *from, int length)
188 {
189         int i;
190         for(i = 0; i < length && from[i] ;i++) {
191                 if(from[i] == '.' || isalpha(from[i]) || isdigit(from[i])) to[i] = from[i];
192                 else to[i] = '_';
193         }
194         if(i < length) to[i] = '\0';
195         to[length - 1] = '\0';
196
197         return to;
198 }
199
200 void rrdset_set_name(RRDSET *st, const char *name)
201 {
202         char b[CONFIG_MAX_VALUE + 1];
203         char n[RRD_ID_LENGTH_MAX + 1];
204
205         snprintf(n, RRD_ID_LENGTH_MAX, "%s.%s", st->type, name);
206         rrdset_strncpy_name(b, n, CONFIG_MAX_VALUE);
207         st->name = config_get(st->id, "name", b);
208         st->hash_name = simple_hash(st->name);
209 }
210
211 // ----------------------------------------------------------------------------
212 // cache directory
213
214 char *rrdset_cache_dir(const char *id)
215 {
216         char *ret = NULL;
217
218         static char *cache_dir = NULL;
219         if(!cache_dir) cache_dir = config_get("global", "database directory", "cache");
220
221         char b[FILENAME_MAX + 1];
222         char n[FILENAME_MAX + 1];
223         rrdset_strncpy_name(b, id, FILENAME_MAX);
224
225         snprintf(n, FILENAME_MAX, "%s/%s", cache_dir, b);
226         ret = config_get(id, "database directory", n);
227
228         if(rrd_memory_mode == RRD_MEMORY_MODE_MAP || rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
229                 int r = mkdir(ret, 0775);
230                 if(r != 0 && errno != EEXIST)
231                         error("Cannot create directory '%s'", ret);
232         }
233
234         return ret;
235 }
236
237 // ----------------------------------------------------------------------------
238 // core functions
239
240 void rrdset_reset(RRDSET *st)
241 {
242         st->last_collected_time.tv_sec = 0;
243         st->last_collected_time.tv_usec = 0;
244         st->last_updated.tv_sec = 0;
245         st->last_updated.tv_usec = 0;
246         st->current_entry = 0;
247         st->counter = 0;
248         st->counter_done = 0;
249
250         RRDDIM *rd;
251         for(rd = st->dimensions; rd ; rd = rd->next) {
252                 rd->last_collected_time.tv_sec = 0;
253                 rd->last_collected_time.tv_usec = 0;
254                 bzero(rd->values, rd->entries * sizeof(storage_number));
255         }
256 }
257
258 RRDSET *rrdset_create(const char *type, const char *id, const char *name, const char *family, const char *title, const char *units, long priority, int update_every, int chart_type)
259 {
260         if(!id || !id[0]) {
261                 fatal("Cannot create rrd stats without an id.");
262                 return NULL;
263         }
264
265         char fullid[RRD_ID_LENGTH_MAX + 1];
266         char fullfilename[FILENAME_MAX + 1];
267         RRDSET *st = NULL;
268
269         snprintf(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
270
271         long entries = config_get_number(fullid, "history", rrd_default_history_entries);
272         if(entries < 5) entries = config_set_number(fullid, "history", 5);
273         if(entries > RRD_HISTORY_ENTRIES_MAX) entries = config_set_number(fullid, "history", RRD_HISTORY_ENTRIES_MAX);
274
275         int enabled = config_get_boolean(fullid, "enabled", 1);
276         if(!enabled) entries = 5;
277
278         unsigned long size = sizeof(RRDSET);
279         char *cache_dir = rrdset_cache_dir(fullid);
280
281         debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id);
282
283         snprintf(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir);
284         if(rrd_memory_mode != RRD_MEMORY_MODE_RAM) st = (RRDSET *)mymmap(fullfilename, size, ((rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE));
285         if(st) {
286                 if(strcmp(st->magic, RRDSET_MAGIC) != 0) {
287                         errno = 0;
288                         info("Initializing file %s.", fullfilename);
289                         bzero(st, size);
290                 }
291                 else if(strcmp(st->id, fullid) != 0) {
292                         errno = 0;
293                         error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid);
294                         // munmap(st, size);
295                         // st = NULL;
296                         bzero(st, size);
297                 }
298                 else if(st->memsize != size || st->entries != entries) {
299                         errno = 0;
300                         error("File %s does not have the desired size. Clearing it.", fullfilename);
301                         bzero(st, size);
302                 }
303                 else if(st->update_every != update_every) {
304                         errno = 0;
305                         error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
306                         bzero(st, size);
307                 }
308                 else if((time(NULL) - st->last_updated.tv_sec) > update_every * entries) {
309                         errno = 0;
310                         error("File %s is too old. Clearing it.", fullfilename);
311                         bzero(st, size);
312                 }
313         }
314
315         if(st) {
316                 st->name = NULL;
317                 st->type = NULL;
318                 st->family = NULL;
319                 st->title = NULL;
320                 st->units = NULL;
321                 st->dimensions = NULL;
322                 st->next = NULL;
323                 st->mapped = rrd_memory_mode;
324         }
325         else {
326                 st = calloc(1, size);
327                 if(!st) {
328                         fatal("Cannot allocate memory for RRD_STATS %s.%s", type, id);
329                         return NULL;
330                 }
331                 st->mapped = RRD_MEMORY_MODE_RAM;
332         }
333         st->memsize = size;
334         st->entries = entries;
335         st->update_every = update_every;
336
337         strcpy(st->cache_filename, fullfilename);
338         strcpy(st->magic, RRDSET_MAGIC);
339
340         strcpy(st->id, fullid);
341         st->hash = simple_hash(st->id);
342
343         st->cache_dir = cache_dir;
344
345         st->family     = config_get(st->id, "family", family?family:st->id);
346         st->units      = config_get(st->id, "units", units?units:"");
347         st->type       = config_get(st->id, "type", type);
348         st->chart_type = rrdset_type_id(config_get(st->id, "chart type", rrdset_type_name(chart_type)));
349
350         if(name && *name) rrdset_set_name(st, name);
351         else rrdset_set_name(st, id);
352
353         {
354                 char varvalue[CONFIG_MAX_VALUE + 1];
355                 snprintf(varvalue, CONFIG_MAX_VALUE, "%s (%s)", title?title:"", st->name);
356                 st->title = config_get(st->id, "title", varvalue);
357         }
358
359         st->priority = config_get_number(st->id, "priority", priority);
360         st->enabled = enabled;
361         
362         st->isdetail = 0;
363         st->debug = 0;
364
365         st->last_collected_time.tv_sec = 0;
366         st->last_collected_time.tv_usec = 0;
367         st->counter_done = 0;
368
369         st->gap_when_lost_iterations_above = config_get_number(st->id, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS);
370
371         st->dimensions_index.root = NULL;
372         st->dimensions_index.compar = rrddim_compare;
373
374         pthread_rwlock_init(&st->rwlock, NULL);
375         pthread_rwlock_wrlock(&rrdset_root_rwlock);
376
377         st->next = rrdset_root;
378         rrdset_root = st;
379
380         rrdset_index_add(st);
381
382         pthread_rwlock_unlock(&rrdset_root_rwlock);
383
384         return(st);
385 }
386
387 RRDDIM *rrddim_add(RRDSET *st, const char *id, const char *name, long multiplier, long divisor, int algorithm)
388 {
389         char filename[FILENAME_MAX + 1];
390         char fullfilename[FILENAME_MAX + 1];
391
392         char varname[CONFIG_MAX_NAME + 1];
393         RRDDIM *rd = NULL;
394         unsigned long size = sizeof(RRDDIM) + (st->entries * sizeof(storage_number));
395
396         debug(D_RRD_CALLS, "Adding dimension '%s/%s'.", st->id, id);
397
398         rrdset_strncpy_name(filename, id, FILENAME_MAX);
399         snprintf(fullfilename, FILENAME_MAX, "%s/%s.db", st->cache_dir, filename);
400         if(rrd_memory_mode != RRD_MEMORY_MODE_RAM) rd = (RRDDIM *)mymmap(fullfilename, size, ((rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE));
401         if(rd) {
402                 struct timeval now;
403                 gettimeofday(&now, NULL);
404
405                 if(strcmp(rd->magic, RRDDIMENSION_MAGIC) != 0) {
406                         errno = 0;
407                         info("Initializing file %s.", fullfilename);
408                         bzero(rd, size);
409                 }
410                 else if(rd->memsize != size) {
411                         errno = 0;
412                         error("File %s does not have the desired size. Clearing it.", fullfilename);
413                         bzero(rd, size);
414                 }
415                 else if(rd->multiplier != multiplier) {
416                         errno = 0;
417                         error("File %s does not have the same multiplier. Clearing it.", fullfilename);
418                         bzero(rd, size);
419                 }
420                 else if(rd->divisor != divisor) {
421                         errno = 0;
422                         error("File %s does not have the same divisor. Clearing it.", fullfilename);
423                         bzero(rd, size);
424                 }
425                 else if(rd->algorithm != algorithm) {
426                         errno = 0;
427                         error("File %s does not have the same algorithm. Clearing it.", fullfilename);
428                         bzero(rd, size);
429                 }
430                 else if(rd->update_every != st->update_every) {
431                         errno = 0;
432                         error("File %s does not have the same refresh frequency. Clearing it.", fullfilename);
433                         bzero(rd, size);
434                 }
435                 else if(usecdiff(&now, &rd->last_collected_time) > (rd->entries * rd->update_every * 1000000ULL)) {
436                         errno = 0;
437                         error("File %s is too old. Clearing it.", fullfilename);
438                         bzero(rd, size);
439                 }
440                 else if(strcmp(rd->id, id) != 0) {
441                         errno = 0;
442                         error("File %s contents are not for dimension %s. Clearing it.", fullfilename, id);
443                         // munmap(rd, size);
444                         // rd = NULL;
445                         bzero(rd, size);
446                 }
447         }
448
449         if(rd) {
450                 // we have a file mapped for rd
451                 rd->mapped = rrd_memory_mode;
452                 rd->hidden = 0;
453                 rd->next = NULL;
454                 rd->name = NULL;
455         }
456         else {
457                 // if we didn't manage to get a mmap'd dimension, just create one
458
459                 rd = calloc(1, size);
460                 if(!rd) {
461                         fatal("Cannot allocate RRD_DIMENSION %s/%s.", st->id, id);
462                         return NULL;
463                 }
464
465                 rd->mapped = RRD_MEMORY_MODE_RAM;
466         }
467         rd->memsize = size;
468
469         strcpy(rd->magic, RRDDIMENSION_MAGIC);
470         strcpy(rd->cache_filename, fullfilename);
471         strncpy(rd->id, id, RRD_ID_LENGTH_MAX);
472         rd->hash = simple_hash(rd->id);
473
474         snprintf(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
475         rd->name = config_get(st->id, varname, (name && *name)?name:rd->id);
476
477         snprintf(varname, CONFIG_MAX_NAME, "dim %s algorithm", rd->id);
478         rd->algorithm = rrddim_algorithm_id(config_get(st->id, varname, rrddim_algorithm_name(algorithm)));
479
480         snprintf(varname, CONFIG_MAX_NAME, "dim %s multiplier", rd->id);
481         rd->multiplier = config_get_number(st->id, varname, multiplier);
482
483         snprintf(varname, CONFIG_MAX_NAME, "dim %s divisor", rd->id);
484         rd->divisor = config_get_number(st->id, varname, divisor);
485         if(!rd->divisor) rd->divisor = 1;
486
487         rd->entries = st->entries;
488         rd->update_every = st->update_every;
489         
490         // append this dimension
491         if(!st->dimensions)
492                 st->dimensions = rd;
493         else {
494                 RRDDIM *td = st->dimensions;
495                 for(; td->next; td = td->next) ;
496                 td->next = rd;
497         }
498
499         rrddim_index_add(st, rd);
500
501         return(rd);
502 }
503
504 void rrddim_set_name(RRDSET *st, RRDDIM *rd, const char *name)
505 {
506         char varname[CONFIG_MAX_NAME + 1];
507         snprintf(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
508         config_get(st->id, varname, name);
509 }
510
511 void rrddim_free(RRDSET *st, RRDDIM *rd)
512 {
513         RRDDIM *i = st->dimensions, *last = NULL;
514         for(i = st->dimensions; i && i != rd ; i = i->next) last = i;
515
516         if(!i) {
517                 error("Request to free dimension %s.%s but it is not linked.", st->id, rd->name);
518                 return;
519         }
520
521         if(last) last = i->next;
522         else st->dimensions = i->next;
523
524         rrddim_index_del(st, rd);
525
526         // free(rd->annotations);
527         if(rd->mapped == RRD_MEMORY_MODE_SAVE) {
528                 debug(D_RRD_CALLS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
529                 savememory(rd->cache_filename, rd, rd->memsize);
530
531                 debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
532                 munmap(rd, rd->memsize);
533         }
534         else if(rd->mapped == RRD_MEMORY_MODE_MAP) {
535                 debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
536                 munmap(rd, rd->memsize);
537         }
538         else {
539                 debug(D_RRD_CALLS, "Removing dimension '%s'.", rd->name);
540                 free(rd);
541         }
542 }
543
544 void rrdset_free_all(void)
545 {
546         info("Freeing all memory...");
547
548         RRDSET *st;
549         for(st = rrdset_root; st ;) {
550                 RRDSET *next = st->next;
551
552                 while(st->dimensions)
553                         rrddim_free(st, st->dimensions);
554
555                 rrdset_index_del(st);
556
557                 if(st->mapped == RRD_MEMORY_MODE_SAVE) {
558                         debug(D_RRD_CALLS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
559                         savememory(st->cache_filename, st, st->memsize);
560
561                         debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
562                         munmap(st, st->memsize);
563                 }
564                 else if(st->mapped == RRD_MEMORY_MODE_MAP) {
565                         debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
566                         munmap(st, st->memsize);
567                 }
568                 else
569                         free(st);
570
571                 st = next;
572         }
573         rrdset_root = NULL;
574
575         info("Memory cleanup completed...");
576 }
577
578 void rrdset_save_all(void)
579 {
580         RRDSET *st;
581         RRDDIM *rd;
582
583         pthread_rwlock_wrlock(&rrdset_root_rwlock);
584         for(st = rrdset_root; st ; st = st->next) {
585                 pthread_rwlock_wrlock(&st->rwlock);
586
587                 if(st->mapped == RRD_MEMORY_MODE_SAVE) {
588                         debug(D_RRD_CALLS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
589                         savememory(st->cache_filename, st, st->memsize);
590                 }
591
592                 for(rd = st->dimensions; rd ; rd = rd->next) {
593                         if(rd->mapped == RRD_MEMORY_MODE_SAVE) {
594                                 debug(D_RRD_CALLS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
595                                 savememory(rd->cache_filename, rd, rd->memsize);
596                         }
597                 }
598
599                 pthread_rwlock_unlock(&st->rwlock);
600         }
601         pthread_rwlock_unlock(&rrdset_root_rwlock);
602 }
603
604
605 RRDSET *rrdset_find(const char *id)
606 {
607         debug(D_RRD_CALLS, "rrd_stats_find() for chart %s", id);
608
609         pthread_rwlock_rdlock(&rrdset_root_rwlock);
610         RRDSET *st = rrdset_index_find(id, 0);
611         pthread_rwlock_unlock(&rrdset_root_rwlock);
612
613         return(st);
614 }
615
616 RRDSET *rrdset_find_bytype(const char *type, const char *id)
617 {
618         debug(D_RRD_CALLS, "rrd_stats_find_bytype() for chart %s.%s", type, id);
619
620         char buf[RRD_ID_LENGTH_MAX + 1];
621
622         strncpy(buf, type, RRD_ID_LENGTH_MAX - 1);
623         buf[RRD_ID_LENGTH_MAX - 1] = '\0';
624         strcat(buf, ".");
625         int len = strlen(buf);
626         strncpy(&buf[len], id, RRD_ID_LENGTH_MAX - len);
627         buf[RRD_ID_LENGTH_MAX] = '\0';
628
629         return(rrdset_find(buf));
630 }
631
632 RRDSET *rrdset_find_byname(const char *name)
633 {
634         debug(D_RRD_CALLS, "rrd_stats_find_byname() for chart %s", name);
635
636         char b[CONFIG_MAX_VALUE + 1];
637
638         rrdset_strncpy_name(b, name, CONFIG_MAX_VALUE);
639         unsigned long hash = simple_hash(b);
640
641         pthread_rwlock_rdlock(&rrdset_root_rwlock);
642         RRDSET *st = rrdset_root;
643         for ( ; st ; st = st->next ) {
644                 if(hash == st->hash_name && strcmp(st->name, b) == 0) break;
645         }
646         pthread_rwlock_unlock(&rrdset_root_rwlock);
647
648         return(st);
649 }
650
651 RRDDIM *rrddim_find(RRDSET *st, const char *id)
652 {
653         debug(D_RRD_CALLS, "rrd_stats_dimension_find() for chart %s, dimension %s", st->name, id);
654
655         return rrddim_index_find(st, id, 0);
656 }
657
658 int rrddim_hide(RRDSET *st, const char *id)
659 {
660         debug(D_RRD_CALLS, "rrd_stats_dimension_hide() for chart %s, dimension %s", st->name, id);
661
662         RRDDIM *rd = rrddim_find(st, id);
663         if(!rd) {
664                 error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
665                 return 1;
666         }
667
668         rd->hidden = 1;
669         return 0;
670 }
671
672 void rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value)
673 {
674         debug(D_RRD_CALLS, "rrd_stats_dimension_set() for chart %s, dimension %s, value " COLLECTED_NUMBER_FORMAT, st->name, rd->name, value);
675         
676         gettimeofday(&rd->last_collected_time, NULL);
677         rd->collected_value = value;
678         rd->updated = 1;
679 }
680
681 int rrddim_set(RRDSET *st, const char *id, collected_number value)
682 {
683         RRDDIM *rd = rrddim_find(st, id);
684         if(!rd) {
685                 error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
686                 return 1;
687         }
688
689         rrddim_set_by_pointer(st, rd, value);
690         return 0;
691 }
692
693 void rrdset_next_usec(RRDSET *st, unsigned long long microseconds)
694 {
695         debug(D_RRD_CALLS, "rrd_stats_next() for chart %s with microseconds %llu", st->name, microseconds);
696
697         if(st->debug) debug(D_RRD_STATS, "%s: NEXT: %llu microseconds", st->name, microseconds);
698         st->usec_since_last_update = microseconds;
699 }
700
701 void rrdset_next(RRDSET *st)
702 {
703         unsigned long long microseconds = 0;
704
705         if(st->last_collected_time.tv_sec) {
706                 struct timeval now;
707                 gettimeofday(&now, NULL);
708                 microseconds = usecdiff(&now, &st->last_collected_time);
709         }
710
711         rrdset_next_usec(st, microseconds);
712 }
713
714 void rrdset_next_plugins(RRDSET *st)
715 {
716         rrdset_next(st);
717 }
718
719 unsigned long long rrdset_done(RRDSET *st)
720 {
721         debug(D_RRD_CALLS, "rrd_stats_done() for chart %s", st->name);
722
723         RRDDIM *rd, *last;
724         int oldstate, store_this_entry = 1;
725
726         if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) != 0)
727                 error("Cannot set pthread cancel state to DISABLE.");
728
729         // a read lock is OK here
730         pthread_rwlock_rdlock(&st->rwlock);
731
732         // check if the chart has a long time to be refreshed
733         if(st->usec_since_last_update > st->entries * st->update_every * 1000000ULL) {
734                 info("%s: took too long to be updated (%0.3Lf secs). Reseting it.", st->name, (long double)(st->usec_since_last_update / 1000000.0));
735                 rrdset_reset(st);
736                 st->usec_since_last_update = st->update_every * 1000000ULL;
737         }
738         if(st->debug) debug(D_RRD_STATS, "%s: microseconds since last update: %llu", st->name, st->usec_since_last_update);
739
740         // set last_collected_time
741         if(!st->last_collected_time.tv_sec) {
742                 // it is the first entry
743                 // set the last_collected_time to now
744                 gettimeofday(&st->last_collected_time, NULL);
745
746                 // the first entry should not be stored
747                 store_this_entry = 0;
748
749                 if(st->debug) debug(D_RRD_STATS, "%s: initializing last_collected to now. Will not store the next entry.", st->name);
750         }
751         else {
752                 // it is not the first entry
753                 // calculate the proper last_collected_time, using usec_since_last_update
754                 unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec + st->usec_since_last_update;
755                 st->last_collected_time.tv_sec = ut / 1000000ULL;
756                 st->last_collected_time.tv_usec = ut % 1000000ULL;
757         }
758
759         // if this set has not been updated in the past
760         // we fake the last_update time to be = now - usec_since_last_update
761         if(!st->last_updated.tv_sec) {
762                 // it has never been updated before
763                 // set a fake last_updated, in the past using usec_since_last_update
764                 unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - st->usec_since_last_update;
765                 st->last_updated.tv_sec = ut / 1000000ULL;
766                 st->last_updated.tv_usec = ut % 1000000ULL;
767
768                 // the first entry should not be stored
769                 store_this_entry = 0;
770
771                 if(st->debug) debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
772         }
773
774         // check if we will re-write the entire data set
775         if(usecdiff(&st->last_collected_time, &st->last_updated) > st->update_every * st->entries * 1000000ULL) {
776                 info("%s: too old data (last updated at %u.%u, last collected at %u.%u). Reseting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
777                 rrdset_reset(st);
778
779                 st->usec_since_last_update = st->update_every * 1000000ULL;
780
781                 gettimeofday(&st->last_collected_time, NULL);
782
783                 unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - st->usec_since_last_update;
784                 st->last_updated.tv_sec = ut / 1000000ULL;
785                 st->last_updated.tv_usec = ut % 1000000ULL;
786
787                 // the first entry should not be stored
788                 store_this_entry = 0;
789         }
790
791         // these are the 3 variables that will help us in interpolation
792         // last_ut = the last time we added a value to the storage
793         //  now_ut = the time the current value is taken at
794         // next_ut = the time of the next interpolation point
795         unsigned long long last_ut = st->last_updated.tv_sec * 1000000ULL + st->last_updated.tv_usec;
796         unsigned long long now_ut  = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec;
797         unsigned long long next_ut = (st->last_updated.tv_sec + st->update_every) * 1000000ULL;
798
799         if(st->debug) debug(D_RRD_STATS, "%s: last ut = %0.3Lf (last updated time)", st->name, (long double)last_ut/1000000.0);
800         if(st->debug) debug(D_RRD_STATS, "%s: now  ut = %0.3Lf (current update time)", st->name, (long double)now_ut/1000000.0);
801         if(st->debug) debug(D_RRD_STATS, "%s: next ut = %0.3Lf (next interpolation point)", st->name, (long double)next_ut/1000000.0);
802
803         if(!st->counter_done) {
804                 store_this_entry = 0;
805                 if(st->debug) debug(D_RRD_STATS, "%s: Will not store the next entry.", st->name);
806         }
807         st->counter_done++;
808
809         // calculate totals and count the dimensions
810         int dimensions;
811         st->collected_total = 0;
812         for( rd = st->dimensions, dimensions = 0 ; rd ; rd = rd->next, dimensions++ )
813                 st->collected_total += rd->collected_value;
814
815         uint32_t storage_flags = SN_EXISTS;
816
817         // process all dimensions to calculate their values
818         // based on the collected figures only
819         // at this stage we do not interpolate anything
820         for( rd = st->dimensions ; rd ; rd = rd->next ) {
821
822                 if(st->debug) debug(D_RRD_STATS, "%s/%s: "
823                         " last_collected_value = " COLLECTED_NUMBER_FORMAT
824                         " collected_value = " COLLECTED_NUMBER_FORMAT
825                         " last_calculated_value = " CALCULATED_NUMBER_FORMAT
826                         " calculated_value = " CALCULATED_NUMBER_FORMAT
827                         , st->id, rd->name
828                         , rd->last_collected_value
829                         , rd->collected_value
830                         , rd->last_calculated_value
831                         , rd->calculated_value
832                         );
833
834                 switch(rd->algorithm) {
835                         case RRDDIM_PCENT_OVER_DIFF_TOTAL:
836                                 // the percentage of the current increment
837                                 // over the increment of all dimensions together
838                                 if(st->collected_total == st->last_collected_total) rd->calculated_value = rd->last_calculated_value;
839                                 else rd->calculated_value =
840                                           (calculated_number)100
841                                         * (calculated_number)(rd->collected_value - rd->last_collected_value)
842                                         / (calculated_number)(st->collected_total  - st->last_collected_total);
843
844                                 if(st->debug)
845                                         debug(D_RRD_STATS, "%s/%s: CALC PCENT-DIFF "
846                                                 CALCULATED_NUMBER_FORMAT " = 100"
847                                                 " * (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
848                                                 " / (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
849                                                 , st->id, rd->name
850                                                 , rd->calculated_value
851                                                 , rd->collected_value, rd->last_collected_value
852                                                 , st->collected_total, st->last_collected_total
853                                                 );
854                                 break;
855
856                         case RRDDIM_PCENT_OVER_ROW_TOTAL:
857                                 if(!st->collected_total) rd->calculated_value = 0;
858                                 else
859                                 // the percentage of the current value
860                                 // over the total of all dimensions
861                                 rd->calculated_value =
862                                           (calculated_number)100
863                                         * (calculated_number)rd->collected_value
864                                         / (calculated_number)st->collected_total;
865
866                                 if(st->debug)
867                                         debug(D_RRD_STATS, "%s/%s: CALC PCENT-ROW "
868                                                 CALCULATED_NUMBER_FORMAT " = 100"
869                                                 " * " COLLECTED_NUMBER_FORMAT
870                                                 " / " COLLECTED_NUMBER_FORMAT
871                                                 , st->id, rd->name
872                                                 , rd->calculated_value
873                                                 , rd->collected_value
874                                                 , st->collected_total
875                                                 );
876                                 break;
877
878                         case RRDDIM_INCREMENTAL:
879                                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
880                                 // to reset the calculation (it will give zero as the calculation for this second)
881                                 if(rd->last_collected_value > rd->collected_value) {
882                                         info("%s.%s: Detect RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
883                                                         , st->name, rd->name
884                                                         , rd->last_collected_value
885                                                         , rd->collected_value);
886                                         storage_flags = SN_EXISTS_RESET;
887                                         rd->last_collected_value = rd->collected_value;
888                                 }
889
890                                 rd->calculated_value += (calculated_number)(rd->collected_value - rd->last_collected_value);
891
892                                 if(st->debug)
893                                         debug(D_RRD_STATS, "%s/%s: CALC INC "
894                                                 CALCULATED_NUMBER_FORMAT " += "
895                                                 COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT
896                                                 , st->id, rd->name
897                                                 , rd->calculated_value
898                                                 , rd->collected_value, rd->last_collected_value
899                                                 );
900                                 break;
901
902                         case RRDDIM_ABSOLUTE:
903                                 rd->calculated_value = (calculated_number)rd->collected_value;
904
905                                 if(st->debug)
906                                         debug(D_RRD_STATS, "%s/%s: CALC ABS/ABS-NO-IN "
907                                                 CALCULATED_NUMBER_FORMAT " = "
908                                                 COLLECTED_NUMBER_FORMAT
909                                                 , st->id, rd->name
910                                                 , rd->calculated_value
911                                                 , rd->collected_value
912                                                 );
913                                 break;
914
915                         default:
916                                 // make the default zero, to make sure
917                                 // it gets noticed when we add new types
918                                 rd->calculated_value = 0;
919
920                                 if(st->debug)
921                                         debug(D_RRD_STATS, "%s/%s: CALC "
922                                                 CALCULATED_NUMBER_FORMAT " = 0"
923                                                 , st->id, rd->name
924                                                 , rd->calculated_value
925                                                 );
926                                 break;
927                 }
928         }
929
930         // at this point we have all the calculated values ready
931         // it is now time to interpolate values on a second boundary
932
933         unsigned long long first_ut = last_ut;
934         int iterations = (now_ut - last_ut) / (st->update_every * 1000000ULL);
935
936         for( ; next_ut <= now_ut ; next_ut += st->update_every * 1000000ULL, iterations-- ) {
937                 if(iterations < 0) error("iterations calculation wrapped!");
938
939                 if(st->debug) debug(D_RRD_STATS, "%s: last ut = %0.3Lf (last updated time)", st->name, (long double)last_ut/1000000.0);
940                 if(st->debug) debug(D_RRD_STATS, "%s: next ut = %0.3Lf (next interpolation point)", st->name, (long double)next_ut/1000000.0);
941
942                 st->last_updated.tv_sec = next_ut / 1000000ULL;
943                 st->last_updated.tv_usec = 0;
944
945                 for( rd = st->dimensions ; rd ; rd = rd->next ) {
946                         calculated_number new_value;
947
948                         switch(rd->algorithm) {
949                                 case RRDDIM_INCREMENTAL:
950                                         new_value = (calculated_number)
951                                                 (          rd->calculated_value
952                                                         * (calculated_number)(next_ut - last_ut)
953                                                         / (calculated_number)(now_ut - last_ut)
954                                                 );
955
956                                         if(st->debug)
957                                                 debug(D_RRD_STATS, "%s/%s: CALC2 INC "
958                                                         CALCULATED_NUMBER_FORMAT " = "
959                                                         CALCULATED_NUMBER_FORMAT
960                                                         " * %llu"
961                                                         " / %llu"
962                                                         , st->id, rd->name
963                                                         , new_value
964                                                         , rd->calculated_value
965                                                         , (unsigned long long)(next_ut - last_ut)
966                                                         , (unsigned long long)(now_ut - last_ut)
967                                                         );
968
969                                         rd->calculated_value -= new_value;
970                                         break;
971
972                                 case RRDDIM_ABSOLUTE:
973                                 case RRDDIM_PCENT_OVER_ROW_TOTAL:
974                                 case RRDDIM_PCENT_OVER_DIFF_TOTAL:
975                                 default:
976                                         new_value = (calculated_number)
977                                                 (       (         (rd->calculated_value - rd->last_calculated_value)
978                                                                 * (calculated_number)(next_ut - first_ut)
979                                                                 / (calculated_number)(now_ut - first_ut)
980                                                         )
981                                                         +  rd->last_calculated_value
982                                                 );
983
984                                         if(st->debug)
985                                                 debug(D_RRD_STATS, "%s/%s: CALC2 DEF "
986                                                         CALCULATED_NUMBER_FORMAT " = ((("
987                                                         "(" CALCULATED_NUMBER_FORMAT " - " CALCULATED_NUMBER_FORMAT ")"
988                                                         " * %llu"
989                                                         " / %llu) + " CALCULATED_NUMBER_FORMAT
990                                                         , st->id, rd->name
991                                                         , new_value
992                                                         , rd->calculated_value, rd->last_calculated_value
993                                                         , (next_ut - first_ut)
994                                                         , (now_ut - first_ut), rd->last_calculated_value
995                                                         );
996
997                                         if(next_ut + st->update_every * 1000000ULL > now_ut) rd->calculated_value = new_value;
998                                         break;
999                         }
1000
1001                         if(!store_this_entry) {
1002                                 store_this_entry = 1;
1003                                 continue;
1004                         }
1005
1006                         if(rd->updated && iterations < st->gap_when_lost_iterations_above) {
1007                                 rd->values[st->current_entry] = pack_storage_number(
1008                                                   new_value
1009                                                 * (calculated_number)rd->multiplier
1010                                                 / (calculated_number)rd->divisor
1011                                         , storage_flags );
1012
1013                                 if(st->debug)
1014                                         debug(D_RRD_STATS, "%s/%s: STORE[%ld] "
1015                                                 CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT
1016                                                 " * %ld"
1017                                                 " / %ld"
1018                                                 , st->id, rd->name
1019                                                 , st->current_entry
1020                                                 , unpack_storage_number(rd->values[st->current_entry]), new_value
1021                                                 , rd->multiplier
1022                                                 , rd->divisor
1023                                                 );
1024                         }
1025                         else {
1026                                 if(st->debug) debug(D_RRD_STATS, "%s/%s: STORE[%ld] = NON EXISTING "
1027                                                 , st->id, rd->name
1028                                                 , st->current_entry
1029                                                 );
1030                                 rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
1031                         }
1032
1033                         if(st->debug) {
1034                                 calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor;
1035                                 calculated_number t2 = unpack_storage_number(rd->values[st->current_entry]);
1036                                 calculated_number accuracy = accuracy_loss(t1, t2);
1037                                 debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " FLAGS=0x%08x (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)"
1038                                                 , st->id, rd->name
1039                                                 , st->current_entry
1040                                                 , t2
1041                                                 , get_storage_number_flags(rd->values[st->current_entry])
1042                                                 , t1
1043                                                 , accuracy
1044                                                 , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1045                                                 );
1046
1047                                 rd->collected_volume += t1;
1048                                 rd->stored_volume += t2;
1049                                 accuracy = accuracy_loss(rd->collected_volume, rd->stored_volume);
1050                                 debug(D_RRD_STATS, "%s/%s: VOLUME[%ld] = " CALCULATED_NUMBER_FORMAT ", calculated  = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s"
1051                                                 , st->id, rd->name
1052                                                 , st->current_entry
1053                                                 , rd->stored_volume
1054                                                 , rd->collected_volume
1055                                                 , accuracy
1056                                                 , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1057                                                 );
1058
1059                         }
1060                 }
1061                 // reset the storage flags for the next point, if any;
1062                 storage_flags = SN_EXISTS;
1063
1064                 if(st->first_entry_t && st->counter >= (unsigned long long)st->entries) {
1065                         // the db is overwriting values
1066                         // add the value we will overwrite
1067                         st->first_entry_t += st->update_every * 1000000ULL;
1068                 }
1069                 
1070                 st->counter++;
1071                 st->current_entry = ((st->current_entry + 1) >= st->entries) ? 0 : st->current_entry + 1;
1072                 if(!st->first_entry_t) st->first_entry_t = next_ut;
1073                 last_ut = next_ut;
1074         }
1075
1076         for( rd = st->dimensions; rd ; rd = rd->next ) {
1077                 if(!rd->updated) continue;
1078                 rd->last_collected_value = rd->collected_value;
1079                 rd->last_calculated_value = rd->calculated_value;
1080                 rd->collected_value = 0;
1081                 rd->updated = 0;
1082
1083                 // if this is the first entry of incremental dimensions
1084                 // we have to set the first calculated_value to zero
1085                 // to eliminate the first spike
1086                 if(st->counter_done == 1) switch(rd->algorithm) {
1087                         case RRDDIM_PCENT_OVER_DIFF_TOTAL:
1088                         case RRDDIM_INCREMENTAL:
1089                                 rd->calculated_value = 0;
1090                                 // the next time, a new incremental total will be calculated
1091                                 break;
1092                 }
1093         }
1094         st->last_collected_total  = st->collected_total;
1095
1096         // ALL DONE ABOUT THE DATA UPDATE
1097         // --------------------------------------------------------------------
1098
1099         // find if there are any obsolete dimensions (not updated recently)
1100         for( rd = st->dimensions; rd ; rd = rd->next )
1101                 if((rd->last_collected_time.tv_sec + (10 * st->update_every)) < st->last_collected_time.tv_sec)
1102                         break;
1103
1104         if(rd) {
1105                 // there is dimension to free
1106                 // upgrade our read lock to a write lock
1107                 pthread_rwlock_unlock(&st->rwlock);
1108                 pthread_rwlock_wrlock(&st->rwlock);
1109
1110                 for( rd = st->dimensions, last = NULL ; rd ; ) {
1111                         if((rd->last_collected_time.tv_sec + (10 * st->update_every)) < st->last_collected_time.tv_sec) { // remove it only it is not updated in 10 seconds
1112                                 debug(D_RRD_STATS, "Removing obsolete dimension '%s' (%s) of '%s' (%s).", rd->name, rd->id, st->name, st->id);
1113
1114                                 if(!last) {
1115                                         st->dimensions = rd->next;
1116                                         rd->next = NULL;
1117                                         rrddim_free(st, rd);
1118                                         rd = st->dimensions;
1119                                         continue;
1120                                 }
1121                                 else {
1122                                         last->next = rd->next;
1123                                         rd->next = NULL;
1124                                         rrddim_free(st, rd);
1125                                         rd = last->next;
1126                                         continue;
1127                                 }
1128                         }
1129
1130                         last = rd;
1131                         rd = rd->next;
1132                 }
1133
1134                 if(!st->dimensions) st->enabled = 0;
1135         }
1136
1137         pthread_rwlock_unlock(&st->rwlock);
1138
1139         if(pthread_setcancelstate(oldstate, NULL) != 0)
1140                 error("Cannot set pthread cancel state to RESTORE (%d).", oldstate);
1141
1142         return(st->usec_since_last_update);
1143 }
1144
1145
1146 // find the oldest entry in the data, skipping all empty slots
1147 time_t rrdset_first_entry_t(RRDSET *st)
1148 {
1149         if(!st->first_entry_t) return st->last_updated.tv_sec;
1150         
1151         return st->first_entry_t / 1000000;
1152 }