# buffer size bytes = 1048576
# reconnect delay seconds = 5
# initial clock resync iterations = 60
+ # free orphan hosts after seconds = 3600
# -----------------------------------------------------------------------------
extern RRD_ALGORITHM rrd_algorithm_id(const char *name);
extern const char *rrd_algorithm_name(RRD_ALGORITHM algorithm);
-// ----------------------------------------------------------------------------
-// flags
-
-typedef enum rrddim_flags {
- RRDDIM_FLAG_HIDDEN = 1 << 0, // this dimension will not be offered to callers
- RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS = 1 << 1, // do not offer RESET or OVERFLOW info to callers
- RRDDIM_FLAG_UPDATED = 1 << 2, // the dimension has been updated since the last processing
- RRDDIM_FLAG_EXPOSED = 1 << 3 // when set what have sent this dimension to the central netdata
-} RRDDIM_FLAGS;
-
-#define rrddim_flag_check(rd, flag) ((rd)->flags & flag)
-#define rrddim_flag_set(rd, flag) (rd)->flags |= flag
-#define rrddim_flag_clear(rd, flag) (rd)->flags &= ~flag
-
-
// ----------------------------------------------------------------------------
// RRD FAMILY
typedef struct rrdfamily RRDFAMILY;
+// ----------------------------------------------------------------------------
+// flags
+// use this for configuration flags, not for state control
+// flags are set/unset in a manner that is not thread safe
+// and may lead to missing information.
+
+typedef enum rrddim_flags {
+ RRDDIM_FLAG_HIDDEN = 1 << 0, // this dimension will not be offered to callers
+ RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS = 1 << 1 // do not offer RESET or OVERFLOW info to callers
+} RRDDIM_FLAGS;
+
+#define rrddim_flag_check(rd, flag) ((rd)->flags & flag)
+#define rrddim_flag_set(rd, flag) (rd)->flags |= flag
+#define rrddim_flag_clear(rd, flag) (rd)->flags &= ~flag
+
+
// ----------------------------------------------------------------------------
// RRD DIMENSION - this is a metric
collected_number multiplier; // the multiplier of the collected values
collected_number divisor; // the divider of the collected values
- uint32_t flags; // options and status options for the dimension
+ uint32_t flags; // configuration flags for the dimension
// ------------------------------------------------------------------------
// members for temporary data we need for calculations
char *cache_filename; // the filename we load/save from/to this set
- size_t counter; // the number of times we added values to this rrdim
+ size_t collections_counter; // the number of times we added values to this rrdim
+ size_t unused[10];
+
+ int updated:1; // 1 when the dimension has been updated since the last processing
+ int exposed:1; // 1 when set what have sent this dimension to the central netdata
struct timeval last_collected_time; // when was this dimension last updated
// this is actual date time we updated the last_collected_value
// ----------------------------------------------------------------------------
// RRDSET - this is a chart
+// use this for configuration flags, not for state control
+// flags are set/unset in a manner that is not thread safe
+// and may lead to missing information.
+
typedef enum rrdset_flags {
- RRDSET_FLAG_ENABLED = 1 << 0, // enables or disables a chart
- RRDSET_FLAG_DETAIL = 1 << 1, // if set, the data set should be considered as a detail of another
- // (the master data set should be the one that has the same family and is not detail)
- RRDSET_FLAG_DEBUG = 1 << 2 // enables or disables debugging for a chart
+ RRDSET_FLAG_ENABLED = 1 << 0, // enables or disables a chart
+ RRDSET_FLAG_DETAIL = 1 << 1, // if set, the data set should be considered as a detail of another
+ // (the master data set should be the one that has the same family and is not detail)
+ RRDSET_FLAG_DEBUG = 1 << 2 // enables or disables debugging for a chart
} RRDSET_FLAGS;
#define rrdset_flag_check(st, flag) ((st)->flags & flag)
long current_entry; // the entry that is currently being updated
// it goes around in a round-robin fashion
- uint32_t flags;
+ uint32_t flags; // configuration flags
int gap_when_lost_iterations_above; // after how many lost iterations a gap should be stored
// netdata will interpolate values for gaps lower than this
pthread_rwlock_t rrdset_rwlock; // protects dimensions linked list
- unsigned long counter; // the number of times we added values to this rrd
- unsigned long counter_done; // the number of times we added values to this rrd
+ size_t counter; // the number of times we added values to this database
+ size_t counter_done; // the number of times rrdset_done() has been called
+ size_t unused[10];
uint32_t hash; // a simple hash on the id, to speed up searching
// we first compare hashes, and only if the hashes are equal we do string comparisons
struct rrdhost {
avl avl; // the index of hosts
+ // ------------------------------------------------------------------------
+ // host information
+
char *hostname; // the hostname of this host
uint32_t hash_hostname; // the hostname hash
char machine_guid[GUID_LEN + 1]; // the unique ID of this host
uint32_t hash_machine_guid; // the hash of the unique ID
+ char *os; // the O/S type of the host
int rrd_update_every; // the update frequency of the host
int rrd_history_entries; // the number of history entries for the host's charts
+ RRD_MEMORY_MODE rrd_memory_mode; // the memory more for the charts of this host
+
+ char *cache_dir; // the directory to save RRD cache files
+ char *varlib_dir; // the directory to save health log
+
- int rrdpush_enabled; // 1 when this host sends metrics to another netdata
+ // ------------------------------------------------------------------------
+ // streaming of data to remote hosts - rrdpush
+
+ int rrdpush_enabled:1; // 1 when this host sends metrics to another netdata
char *rrdpush_destination; // where to send metrics to
char *rrdpush_api_key; // the api key at the receiving netdata
- volatile int rrdpush_connected; // 1 when the sender is ready to push metrics
- volatile int rrdpush_spawn; // 1 when the sender thread has been spawn
- volatile int rrdpush_error_shown; // 1 when we have logged a communication error
+ volatile int rrdpush_connected:1; // 1 when the sender is ready to push metrics
+ volatile int rrdpush_spawn:1; // 1 when the sender thread has been spawn
+ volatile int rrdpush_error_shown:1; // 1 when we have logged a communication error
int rrdpush_socket; // the fd of the socket to the remote host, or -1
pthread_t rrdpush_thread; // the sender thread
pthread_mutex_t rrdpush_mutex; // exclusive access to rrdpush_buffer
int rrdpush_pipe[2]; // collector to sender thread communication
BUFFER *rrdpush_buffer; // collector fills it, sender sends them
- int health_enabled; // 1 when this host has health enabled
- time_t health_delay_up_to; // a timestamp to delay alarms processing up to
- RRD_MEMORY_MODE rrd_memory_mode; // the memory more for the charts of this host
- RRDSET *rrdset_root; // the host charts
+ // ------------------------------------------------------------------------
+ // streaming of data from remote hosts - rrdpush
- pthread_rwlock_t rrdhost_rwlock; // lock for this RRDHOST (protects rrdset_root linked list)
+ volatile size_t connected_senders; // when remote hosts are streaming to this
+ // host, this is the counter of connected clients
- avl_tree_lock rrdset_root_index; // the host's charts index (by id)
- avl_tree_lock rrdset_root_index_name; // the host's charts index (by name)
+ time_t senders_disconnected_time; // the time the last sender was disconnected
- avl_tree_lock rrdfamily_root_index; // the host's chart families index
- avl_tree_lock variables_root_index; // the host's chart variables index
+ // ------------------------------------------------------------------------
+ // health monitoring options
- char *cache_dir; // the directory to save RRD cache files
- char *varlib_dir; // the directory to save health log
+ int health_enabled:1; // 1 when this host has health enabled
+ time_t health_delay_up_to; // a timestamp to delay alarms processing up to
+ char *health_default_exec; // the full path of the alarms notifications program
+ char *health_default_recipient; // the default recipient for all alarms
+ char *health_log_filename; // the alarms event log filename
+ size_t health_log_entries_written; // the number of alarm events writtern to the alarms event log
+ FILE *health_log_fp; // the FILE pointer to the open alarms event log file
// all RRDCALCs are primarily allocated and linked here
// RRDCALCs may be linked to charts at any point
// are created or renamed, that match them
RRDCALCTEMPLATE *templates;
- char *os; // the O/S type of the host
- volatile size_t use_counter; // when remote hosts are streaming to this
- // host, this is the counter of connected clients
- // health / alarm settings
- char *health_default_exec;
- char *health_default_recipient;
- char *health_log_filename;
- size_t health_log_entries_written;
- FILE *health_log_fp;
+ // ------------------------------------------------------------------------
+ // the charts of the host
+
+ RRDSET *rrdset_root; // the host charts
+
+
+ // ------------------------------------------------------------------------
+ // locks
+
+ pthread_rwlock_t rrdhost_rwlock; // lock for this RRDHOST (protects rrdset_root linked list)
+
+ avl_tree_lock rrdset_root_index; // the host's charts index (by id)
+ avl_tree_lock rrdset_root_index_name; // the host's charts index (by name)
+
+ avl_tree_lock rrdfamily_root_index; // the host's chart families index
+ avl_tree_lock variables_root_index; // the host's chart variables index
struct rrdhost *next;
};
// ----------------------------------------------------------------------------
+extern time_t rrdhost_free_orphan_time;
+
extern void rrd_init(char *hostname);
-extern RRDHOST *rrdhost_find(const char *guid, uint32_t hash);
+extern RRDHOST *rrdhost_find_guid(const char *guid, uint32_t hash);
extern RRDHOST *rrdhost_find_or_create(
const char *hostname
, const char *guid
extern void rrdhost_free_all(void);
extern void rrdhost_save_all(void);
+extern void rrdhost_cleanup_remote_stale(RRDHOST *protected);
extern void rrdhost_free(RRDHOST *host);
extern void rrdhost_save(RRDHOST *host);
// for each dimension
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if(rd->counter) {
+ if(rd->collections_counter) {
char dimension[PROMETHEUS_ELEMENT_MAX + 1];
prometheus_name_copy(dimension, rd->id, PROMETHEUS_ELEMENT_MAX);
// for each dimension
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if(rd->counter) {
+ if(rd->collections_counter) {
char dimension[SHELL_ELEMENT_MAX + 1];
shell_name_copy(dimension, rd->id, SHELL_ELEMENT_MAX);
rd->update_every = st->update_every;
// prevent incremental calculation spikes
- rd->counter = 0;
+ rd->collections_counter = 0;
+ rd->updated = 0;
rd->flags = 0x00000000;
rd->calculated_value = 0;
now_realtime_timeval(&rd->last_collected_time);
rd->collected_value = value;
- rrddim_flag_set(rd, RRDDIM_FLAG_UPDATED);
- rd->counter++;
+ rd->updated = 1;
+
+ rd->collections_counter++;
// fprintf(stderr, "%s.%s %llu " COLLECTED_NUMBER_FORMAT " dt %0.6f" " rate " CALCULATED_NUMBER_FORMAT "\n", st->name, rd->name, st->usec_since_last_update, value, (float)((double)st->usec_since_last_update / (double)1000000), (calculated_number)((value - rd->last_collected_value) * (calculated_number)rd->multiplier / (calculated_number)rd->divisor * 1000000.0 / (calculated_number)st->usec_since_last_update));
pthread_rwlock_t rrd_rwlock = PTHREAD_RWLOCK_INITIALIZER;
+time_t rrdhost_free_orphan_time = 3600;
// ----------------------------------------------------------------------------
// RRDHOST index
.rwlock = AVL_LOCK_INITIALIZER
};
-RRDHOST *rrdhost_find(const char *guid, uint32_t hash) {
+RRDHOST *rrdhost_find_guid(const char *guid, uint32_t hash) {
debug(D_RRDHOST, "Searching in index for host with guid '%s'", guid);
RRDHOST tmp;
) {
debug(D_RRDHOST, "Searching for host '%s' with guid '%s'", hostname, guid);
- RRDHOST *host = rrdhost_find(guid, 0);
+ RRDHOST *host = rrdhost_find_guid(guid, 0);
if(!host) {
host = rrdhost_create(
hostname
error("Host '%s' has memory mode '%s', but the wanted one is '%s'.", host->hostname, rrd_memory_mode_name(host->rrd_memory_mode), rrd_memory_mode_name(mode));
}
+ rrdhost_cleanup_remote_stale(host);
+
return host;
}
+void rrdhost_cleanup_remote_stale(RRDHOST *protected) {
+ rrd_wrlock();
+
+ RRDHOST *h;
+ rrdhost_foreach_write(h) {
+ if(h != protected
+ && h != localhost
+ && !h->connected_senders
+ && h->senders_disconnected_time + rrdhost_free_orphan_time > now_realtime_sec()) {
+ info("Host '%s' with machine guid '%s' is obsolete - cleaning up.", h->hostname, h->machine_guid);
+ rrdhost_save(h);
+ rrdhost_free(h);
+ break;
+ }
+ }
+
+ rrd_unlock();
+}
+
// ----------------------------------------------------------------------------
// RRDHOST global / startup initialization
default_rrdpush_enabled = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
+ rrdhost_free_orphan_time = appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "free orphan hosts after seconds", rrdhost_free_orphan_time);
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
error("STREAM [send]: cannot enable sending thread - information is missing.");
static inline int need_to_send_chart_definition(RRDSET *st) {
RRDDIM *rd;
rrddim_foreach_read(rd, st)
- if(!rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
+ if(!rd->exposed)
return 1;
return 0;
, rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
, rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
);
- rrddim_flag_set(rd, RRDDIM_FLAG_EXPOSED);
+ rd->exposed = 1;
}
}
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
+ if(rd->updated && rd->exposed)
buffer_sprintf(st->rrdhost->rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
, rd->id
, rd->collected_value
RRDDIM *rd;
rrddim_foreach_read(rd, st)
- rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
+ rd->exposed = 0;
rrdset_unlock(st);
}
}
rrdhost_wrlock(host);
- host->use_counter++;
+ host->connected_senders++;
if(health_enabled != CONFIG_BOOLEAN_NO)
host->health_delay_up_to = now_realtime_sec() + alarms_delay;
rrdhost_unlock(host);
error("STREAM %s [receive from [%s]:%s]: disconnected (completed updates %zu).", host->hostname, client_ip, client_port, count);
rrdhost_wrlock(host);
- host->use_counter--;
- if(!host->use_counter) {
+ host->connected_senders--;
+ if(!host->connected_senders) {
if(health_enabled == CONFIG_BOOLEAN_AUTO)
host->health_enabled = 0;
+ host->senders_disconnected_time = now_realtime_sec();
+
rrdpush_sender_thread_stop(host);
}
rrdhost_unlock(host);
rrddim_foreach_read(rd, st) {
rd->last_collected_time.tv_sec = 0;
rd->last_collected_time.tv_usec = 0;
- rd->counter = 0;
+ rd->collections_counter = 0;
memset(rd->values, 0, rd->entries * sizeof(storage_number));
}
}
st->collected_total = 0;
rrddim_foreach_read(rd, st) {
dimensions++;
- if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
+ if(likely(rd->updated))
st->collected_total += rd->collected_value;
}
// at this stage we do not interpolate anything
rrddim_foreach_read(rd, st) {
- if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))) {
+ if(unlikely(!rd->updated)) {
rd->calculated_value = 0;
continue;
}
break;
case RRD_ALGORITHM_INCREMENTAL:
- if(unlikely(rd->counter <= 1)) {
+ if(unlikely(rd->collections_counter <= 1)) {
rd->calculated_value = 0;
continue;
}
break;
case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL:
- if(unlikely(rd->counter <= 1)) {
+ if(unlikely(rd->collections_counter <= 1)) {
rd->calculated_value = 0;
continue;
}
continue;
}
- if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rd->counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
+ if(likely(rd->updated && rd->collections_counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
rd->values[st->current_entry] = pack_storage_number(new_value, storage_flags );
rd->last_stored_value = new_value;
st->last_collected_total = st->collected_total;
rrddim_foreach_read(rd, st) {
- if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
+ if(unlikely(!rd->updated))
continue;
if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
rd->calculated_value = 0;
rd->collected_value = 0;
- rrddim_flag_clear(rd, RRDDIM_FLAG_UPDATED);
+ rd->updated = 0;
if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(D_RRD_STATS, "%s/%s: END "
for(s = filename; *s ;s++) {
if( !isalnum(*s) && *s != '/' && *s != '.' && *s != '-' && *s != '_') {
debug(D_WEB_CLIENT_ACCESS, "%llu: File '%s' is not acceptable.", w->id, filename);
+ w->response.data->contenttype = CT_TEXT_HTML;
buffer_sprintf(w->response.data, "Filename contains invalid characters: ");
buffer_strcat_htmlescape(w->response.data, filename);
return 400;
// if the filename contains a .. refuse to serve it
if(strstr(filename, "..") != 0) {
debug(D_WEB_CLIENT_ACCESS, "%llu: File '%s' is not acceptable.", w->id, filename);
+ w->response.data->contenttype = CT_TEXT_HTML;
buffer_strcat(w->response.data, "Relative filenames are not supported: ");
buffer_strcat_htmlescape(w->response.data, filename);
return 400;
struct stat stat;
if(lstat(webfilename, &stat) != 0) {
debug(D_WEB_CLIENT_ACCESS, "%llu: File '%s' is not found.", w->id, webfilename);
+ w->response.data->contenttype = CT_TEXT_HTML;
buffer_strcat(w->response.data, "File does not exist, or is not accessible: ");
buffer_strcat_htmlescape(w->response.data, webfilename);
return 404;
// check if the file is owned by expected user
if(stat.st_uid != web_files_uid()) {
error("%llu: File '%s' is owned by user %u (expected user %u). Access Denied.", w->id, webfilename, stat.st_uid, web_files_uid());
+ w->response.data->contenttype = CT_TEXT_HTML;
buffer_strcat(w->response.data, "Access to file is not permitted: ");
buffer_strcat_htmlescape(w->response.data, webfilename);
return 403;
// check if the file is owned by expected group
if(stat.st_gid != web_files_gid()) {
error("%llu: File '%s' is owned by group %u (expected group %u). Access Denied.", w->id, webfilename, stat.st_gid, web_files_gid());
+ w->response.data->contenttype = CT_TEXT_HTML;
buffer_strcat(w->response.data, "Access to file is not permitted: ");
buffer_strcat_htmlescape(w->response.data, webfilename);
return 403;
if((stat.st_mode & S_IFMT) != S_IFREG) {
error("%llu: File '%s' is not a regular file. Access Denied.", w->id, webfilename);
+ w->response.data->contenttype = CT_TEXT_HTML;
buffer_strcat(w->response.data, "Access to file is not permitted: ");
buffer_strcat_htmlescape(w->response.data, webfilename);
return 403;
if(errno == EBUSY || errno == EAGAIN) {
error("%llu: File '%s' is busy, sending 307 Moved Temporarily to force retry.", w->id, webfilename);
+ w->response.data->contenttype = CT_TEXT_HTML;
buffer_sprintf(w->response.header, "Location: /" WEB_PATH_FILE "/%s\r\n", filename);
buffer_strcat(w->response.data, "File is currently busy, please try again later: ");
buffer_strcat_htmlescape(w->response.data, webfilename);
}
else {
error("%llu: Cannot open file '%s'.", w->id, webfilename);
+ w->response.data->contenttype = CT_TEXT_HTML;
buffer_strcat(w->response.data, "Cannot open file: ");
buffer_strcat_htmlescape(w->response.data, webfilename);
return 404;
return web_client_api_request_v1(host, w, url);
else {
buffer_flush(w->response.data);
+ w->response.data->contenttype = CT_TEXT_HTML;
buffer_strcat(w->response.data, "Unsupported API version: ");
buffer_strcat_htmlescape(w->response.data, tok);
return 404;
}
buffer_flush(w->response.data);
+ w->response.data->contenttype = CT_TEXT_HTML;
buffer_strcat(w->response.data, "This netdata does not maintain a database for host: ");
buffer_strcat_htmlescape(w->response.data, tok?tok:"");
return 404;
RRDSET *st = rrdset_find_byname(host, tok);
if(!st) st = rrdset_find(host, tok);
if(!st) {
+ w->response.data->contenttype = CT_TEXT_HTML;
buffer_strcat(w->response.data, "Chart is not found: ");
buffer_strcat_htmlescape(w->response.data, tok);
debug(D_WEB_CLIENT_ACCESS, "%llu: %s is not found.", w->id, tok);
else
rrdset_flag_set(st, RRDSET_FLAG_DEBUG);
+ w->response.data->contenttype = CT_TEXT_HTML;
buffer_sprintf(w->response.data, "Chart has now debug %s: ", rrdset_flag_check(st, RRDSET_FLAG_DEBUG)?"enabled":"disabled");
buffer_strcat_htmlescape(w->response.data, tok);
debug(D_WEB_CLIENT_ACCESS, "%llu: debug for %s is %s.", w->id, tok, rrdset_flag_check(st, RRDSET_FLAG_DEBUG)?"enabled":"disabled");