]> arthur.barton.de Git - netdata.git/blob - src/rrdhost.c
allow each netdata host to have its own thread for streaming metrics
[netdata.git] / src / rrdhost.c
1 #define NETDATA_RRD_INTERNALS 1
2 #include "common.h"
3
4 RRDHOST *localhost = NULL;
5
6 pthread_rwlock_t rrd_rwlock = PTHREAD_RWLOCK_INITIALIZER;
7
8
9 // ----------------------------------------------------------------------------
10 // RRDHOST index
11
12 int rrdhost_compare(void* a, void* b) {
13     if(((RRDHOST *)a)->hash_machine_guid < ((RRDHOST *)b)->hash_machine_guid) return -1;
14     else if(((RRDHOST *)a)->hash_machine_guid > ((RRDHOST *)b)->hash_machine_guid) return 1;
15     else return strcmp(((RRDHOST *)a)->machine_guid, ((RRDHOST *)b)->machine_guid);
16 }
17
18 avl_tree_lock rrdhost_root_index = {
19         .avl_tree = { NULL, rrdhost_compare },
20         .rwlock = AVL_LOCK_INITIALIZER
21 };
22
23 RRDHOST *rrdhost_find(const char *guid, uint32_t hash) {
24     debug(D_RRDHOST, "Searching in index for host with guid '%s'", guid);
25
26     RRDHOST tmp;
27     strncpyz(tmp.machine_guid, guid, GUID_LEN);
28     tmp.hash_machine_guid = (hash)?hash:simple_hash(tmp.machine_guid);
29
30     return (RRDHOST *)avl_search_lock(&(rrdhost_root_index), (avl *) &tmp);
31 }
32
33 #define rrdhost_index_add(rrdhost) (RRDHOST *)avl_insert_lock(&(rrdhost_root_index), (avl *)(rrdhost))
34 #define rrdhost_index_del(rrdhost) (RRDHOST *)avl_remove_lock(&(rrdhost_root_index), (avl *)(rrdhost))
35
36
37 // ----------------------------------------------------------------------------
38 // RRDHOST - internal helpers
39
40 static inline void rrdhost_init_hostname(RRDHOST *host, const char *hostname) {
41     freez(host->hostname);
42     host->hostname = strdupz(hostname);
43     host->hash_hostname = simple_hash(host->hostname);
44 }
45
46 static inline void rrdhost_init_os(RRDHOST *host, const char *os) {
47     freez(host->os);
48     host->os = strdupz(os?os:"unknown");
49 }
50
51 static inline void rrdhost_init_machine_guid(RRDHOST *host, const char *machine_guid) {
52     strncpy(host->machine_guid, machine_guid, GUID_LEN);
53     host->machine_guid[GUID_LEN] = '\0';
54     host->hash_machine_guid = simple_hash(host->machine_guid);
55 }
56
57
58 // ----------------------------------------------------------------------------
59 // RRDHOST - add a host
60
61 RRDHOST *rrdhost_create(const char *hostname,
62         const char *guid,
63         const char *os,
64         int update_every,
65         int entries,
66         RRD_MEMORY_MODE memory_mode,
67         int health_enabled) {
68
69     debug(D_RRDHOST, "Host '%s': adding with guid '%s'", hostname, guid);
70
71     RRDHOST *host = callocz(1, sizeof(RRDHOST));
72
73     host->rrd_update_every    = update_every;
74     host->rrd_history_entries = entries;
75     host->rrd_memory_mode     = memory_mode;
76     host->health_enabled      = (memory_mode == RRD_MEMORY_MODE_NONE)? 0 : health_enabled;
77     host->rrdpush_enabled     = default_rrdpush_enabled;
78     host->rrdpush_exclusive   = default_rrdpush_exclusive;
79
80     host->rrdpush_pipe[0] = -1;
81     host->rrdpush_pipe[1] = -1;
82     host->rrdpush_socket = -1;
83
84     pthread_mutex_init(&host->rrdpush_mutex, NULL);
85     pthread_rwlock_init(&host->rrdhost_rwlock, NULL);
86
87     rrdhost_init_hostname(host, hostname);
88     rrdhost_init_machine_guid(host, guid);
89     rrdhost_init_os(host, os);
90
91     avl_init_lock(&(host->rrdset_root_index),      rrdset_compare);
92     avl_init_lock(&(host->rrdset_root_index_name), rrdset_compare_name);
93     avl_init_lock(&(host->rrdfamily_root_index),   rrdfamily_compare);
94     avl_init_lock(&(host->variables_root_index),   rrdvar_compare);
95
96     // ------------------------------------------------------------------------
97     // initialize health variables
98
99     host->health_log.next_log_id = 1;
100     host->health_log.next_alarm_id = 1;
101     host->health_log.max = 1000;
102     host->health_log.next_log_id =
103     host->health_log.next_alarm_id = (uint32_t)now_realtime_sec();
104
105     long n = config_get_number("health", "in memory max health log entries", host->health_log.max);
106     if(n < 10) {
107         error("Host '%s': health configuration has invalid max log entries %ld. Using default %u", host->hostname, n, host->health_log.max);
108         config_set_number("health", "in memory max health log entries", (long)host->health_log.max);
109     }
110     else
111         host->health_log.max = (unsigned int)n;
112
113     pthread_rwlock_init(&(host->health_log.alarm_log_rwlock), NULL);
114
115     char filename[FILENAME_MAX + 1];
116
117     if(!localhost) {
118         // this is localhost
119
120         host->cache_dir  = strdupz(netdata_configured_cache_dir);
121         host->varlib_dir = strdupz(netdata_configured_varlib_dir);
122
123     }
124     else {
125         // this is not localhost - append our GUID to localhost path
126
127         snprintfz(filename, FILENAME_MAX, "%s/%s", netdata_configured_cache_dir, host->machine_guid);
128         host->cache_dir = strdupz(filename);
129
130         if(host->rrd_memory_mode == RRD_MEMORY_MODE_MAP || host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
131             int r = mkdir(host->cache_dir, 0775);
132             if(r != 0 && errno != EEXIST)
133                 error("Host '%s': cannot create directory '%s'", host->hostname, host->cache_dir);
134         }
135
136         snprintfz(filename, FILENAME_MAX, "%s/%s", netdata_configured_varlib_dir, host->machine_guid);
137         host->varlib_dir = strdupz(filename);
138
139         if(host->health_enabled) {
140             int r = mkdir(host->varlib_dir, 0775);
141             if(r != 0 && errno != EEXIST)
142                 error("Host '%s': cannot create directory '%s'", host->hostname, host->varlib_dir);
143
144             snprintfz(filename, FILENAME_MAX, "%s/health", host->varlib_dir);
145             r = mkdir(filename, 0775);
146             if(r != 0 && errno != EEXIST)
147                 error("Host '%s': cannot create directory '%s'", host->hostname, filename);
148         }
149
150     }
151
152     snprintfz(filename, FILENAME_MAX, "%s/health/health-log.db", host->varlib_dir);
153     host->health_log_filename = strdupz(config_get("health", "health db file", filename));
154
155     snprintfz(filename, FILENAME_MAX, "%s/alarm-notify.sh", netdata_configured_plugins_dir);
156     host->health_default_exec = strdupz(config_get("health", "script to execute on alarm", filename));
157     host->health_default_recipient = strdup("root");
158
159
160     // ------------------------------------------------------------------------
161     // load health configuration
162
163     if(host->health_enabled) {
164         health_alarm_log_load(host);
165         health_alarm_log_open(host);
166
167         rrdhost_wrlock(host);
168         health_readdir(host, health_config_dir());
169         rrdhost_unlock(host);
170     }
171
172
173     // ------------------------------------------------------------------------
174     // link it and add it to the index
175
176     rrd_wrlock();
177
178     if(localhost) {
179         host->next = localhost->next;
180         localhost->next = host;
181     }
182
183     if(rrdhost_index_add(host) != host)
184         fatal("Host '%s': cannot add host to index. It already exists.", hostname);
185
186     rrd_unlock();
187
188     debug(D_RRDHOST, "Host '%s', added with guid '%s'", host->hostname, host->machine_guid);
189     return host;
190 }
191
192 RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, const char *os, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled) {
193     debug(D_RRDHOST, "Searching for host '%s' with guid '%s'", hostname, guid);
194
195     RRDHOST *host = rrdhost_find(guid, 0);
196     if(!host) {
197         host = rrdhost_create(hostname, guid, os, update_every, history, mode, health_enabled);
198     }
199     else {
200         host->health_enabled = health_enabled;
201
202         if(strcmp(host->hostname, hostname)) {
203             char *t = host->hostname;
204             char *n = strdupz(hostname);
205             host->hostname = n;
206             freez(t);
207         }
208
209         if(host->rrd_update_every != update_every)
210             error("Host '%s' has an update frequency of %d seconds, but the wanted one is %d seconds.", host->hostname, host->rrd_update_every, update_every);
211
212         if(host->rrd_history_entries != history)
213             error("Host '%s' has history of %d entries, but the wanted one is %d entries.", host->hostname, host->rrd_history_entries, history);
214
215         if(host->rrd_memory_mode != mode)
216             error("Host '%s' has memory mode '%s', but the wanted one is '%s'.", host->hostname, rrd_memory_mode_name(host->rrd_memory_mode), rrd_memory_mode_name(mode));
217     }
218
219     return host;
220 }
221
222 // ----------------------------------------------------------------------------
223 // RRDHOST global / startup initialization
224
225 void rrd_init(char *hostname) {
226     debug(D_RRDHOST, "Initializing localhost with hostname '%s'", hostname);
227
228     localhost = rrdhost_create(hostname,
229             registry_get_this_machine_guid(),
230             os_type,
231             default_rrd_update_every,
232             default_rrd_history_entries,
233             default_rrd_memory_mode,
234             default_health_enabled
235     );
236 }
237
238 // ----------------------------------------------------------------------------
239 // RRDHOST - lock validations
240 // there are only used when NETDATA_INTERNAL_CHECKS is set
241
242 void rrdhost_check_rdlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line) {
243     debug(D_RRDHOST, "Checking read lock on host '%s'", host->hostname);
244
245     int ret = pthread_rwlock_trywrlock(&host->rrdhost_rwlock);
246     if(ret == 0)
247         fatal("RRDHOST '%s' should be read-locked, but it is not, at function %s() at line %lu of file '%s'", host->hostname, function, line, file);
248 }
249
250 void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line) {
251     debug(D_RRDHOST, "Checking write lock on host '%s'", host->hostname);
252
253     int ret = pthread_rwlock_tryrdlock(&host->rrdhost_rwlock);
254     if(ret == 0)
255         fatal("RRDHOST '%s' should be write-locked, but it is not, at function %s() at line %lu of file '%s'", host->hostname, function, line, file);
256 }
257
258 void rrd_check_rdlock_int(const char *file, const char *function, const unsigned long line) {
259     debug(D_RRDHOST, "Checking read lock on all RRDs");
260
261     int ret = pthread_rwlock_trywrlock(&rrd_rwlock);
262     if(ret == 0)
263         fatal("RRDs should be read-locked, but it are not, at function %s() at line %lu of file '%s'", function, line, file);
264 }
265
266 void rrd_check_wrlock_int(const char *file, const char *function, const unsigned long line) {
267     debug(D_RRDHOST, "Checking write lock on all RRDs");
268
269     int ret = pthread_rwlock_tryrdlock(&rrd_rwlock);
270     if(ret == 0)
271         fatal("RRDs should be write-locked, but it are not, at function %s() at line %lu of file '%s'", function, line, file);
272 }
273
274 // ----------------------------------------------------------------------------
275 // RRDHOST - free
276
277 void rrdhost_free(RRDHOST *host) {
278     if(!host) return;
279
280     info("Freeing all memory for host '%s'...", host->hostname);
281
282     rrd_check_wrlock();     // make sure the RRDs are write locked
283     rrdhost_wrlock(host);   // lock this RRDHOST
284
285     // ------------------------------------------------------------------------
286     // release its children resources
287
288     while(host->rrdset_root) rrdset_free(host->rrdset_root);
289
290     while(host->alarms) rrdcalc_free(host, host->alarms);
291     while(host->templates) rrdcalctemplate_free(host, host->templates);
292     health_alarm_log_free(host);
293
294
295     // ------------------------------------------------------------------------
296     // remove it from the indexes
297
298     if(rrdhost_index_del(host) != host)
299         error("RRDHOST '%s' removed from index, deleted the wrong entry.", host->hostname);
300
301
302     // ------------------------------------------------------------------------
303     // unlink it from the host
304
305     if(host == localhost) {
306         localhost = host->next;
307     }
308     else {
309         // find the previous one
310         RRDHOST *h;
311         for(h = localhost; h && h->next != host ; h = h->next) ;
312
313         // bypass it
314         if(h) h->next = host->next;
315         else error("Request to free RRDHOST '%s': cannot find it", host->hostname);
316     }
317
318     // ------------------------------------------------------------------------
319     // free it
320
321     if(host->rrdpush_spawn) {
322         pthread_cancel(host->rrdpush_thread);
323         rrdpush_sender_cleanup(host);
324     }
325
326     freez(host->os);
327     freez(host->cache_dir);
328     freez(host->varlib_dir);
329     freez(host->health_default_exec);
330     freez(host->health_default_recipient);
331     freez(host->health_log_filename);
332     freez(host->hostname);
333     rrdhost_unlock(host);
334     freez(host);
335
336     info("Host memory cleanup completed...");
337 }
338
339 void rrdhost_free_all(void) {
340     rrd_wrlock();
341     while(localhost) rrdhost_free(localhost);
342     rrd_unlock();
343 }
344
345 // ----------------------------------------------------------------------------
346 // RRDHOST - save
347
348 void rrdhost_save(RRDHOST *host) {
349     if(!host) return;
350
351     info("Saving host '%s' database...", host->hostname);
352
353     RRDSET *st;
354     RRDDIM *rd;
355
356     // we get a write lock
357     // to ensure only one thread is saving the database
358     rrdhost_wrlock(host);
359
360     rrdset_foreach_write(st, host) {
361         rrdset_rdlock(st);
362
363         if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
364             debug(D_RRD_STATS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
365             savememory(st->cache_filename, st, st->memsize);
366         }
367
368         rrddim_foreach_read(rd, st) {
369             if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE)) {
370                 debug(D_RRD_STATS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
371                 savememory(rd->cache_filename, rd, rd->memsize);
372             }
373         }
374
375         rrdset_unlock(st);
376     }
377
378     rrdhost_unlock(host);
379 }
380
381 void rrdhost_save_all(void) {
382     info("Saving database...");
383
384     rrd_rdlock();
385
386     RRDHOST *host;
387     rrdhost_foreach_read(host)
388         rrdhost_save(host);
389
390     rrd_unlock();
391 }