if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &pthreadoldcancelstate) != 0))
error("Cannot set pthread cancel state to DISABLE.");
+ rrd_rdlock();
RRDHOST *host;
- for(host = localhost; host ; host = host->next) {
- // for each host
-
+ rrdhost_foreach_read(host) {
rrdhost_rdlock(host);
RRDSET *st;
- for(st = host->rrdset_root; st; st = st->next) {
- // for each chart
-
+ rrdset_foreach_read(st, host) {
rrdset_rdlock(st);
RRDDIM *rd;
- for(rd = st->dimensions; rd; rd = rd->next) {
- // for each dimension
-
+ rrddim_foreach_read(rd, st) {
if(rd->last_collected_time.tv_sec >= after)
chart_buffered_metrics += backend_request_formatter(b, prefix, host, (host == localhost)?hostname:host->hostname, st, rd, after, before, options);
}
-
rrdset_unlock(st);
}
-
rrdhost_unlock(host);
}
+ rrd_unlock();
if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
t->flags |= HEALTH_ENTRY_FLAG_UPDATED;
}
+ rrdhost_rdlock(host);
// reset all thresholds to all charts
RRDSET *st;
- for(st = host->rrdset_root; st ; st = st->next) {
+ rrdset_foreach_read(st, host) {
st->green = NAN;
st->red = NAN;
}
+ rrdhost_unlock(host);
// load the new alarms
rrdhost_wrlock(host);
health_readdir(host, path);
- rrdhost_unlock(host);
// link the loaded alarms to their charts
- for(st = host->rrdset_root; st ; st = st->next) {
- rrdhost_wrlock(host);
-
+ rrdset_foreach_write(st, host) {
rrdsetcalc_link_matching(st);
rrdcalctemplate_link_matching(st);
-
- rrdhost_unlock(host);
}
+
+ rrdhost_unlock(host);
}
void health_reload(void) {
- RRDHOST *host;
- for(host = localhost; host ; host = host->next)
+ rrd_rdlock();
+
+ RRDHOST *host;
+ rrdhost_foreach_read(host)
health_reload_host(host);
+
+ rrd_unlock();
}
// ----------------------------------------------------------------------------
if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) != 0))
error("Cannot set pthread cancel state to DISABLE.");
+ rrd_rdlock();
+
RRDHOST *host;
- for(host = localhost; host ; host = host->next) {
+ rrdhost_foreach_read(host) {
if(unlikely(!host->health_enabled)) continue;
rrdhost_rdlock(host);
} /* host loop */
+ rrd_unlock();
+
if(unlikely(pthread_setcancelstate(oldstate, NULL) != 0))
error("Cannot set pthread cancel state to RESTORE (%d).", oldstate);
RRDSET *st = NULL;
uint32_t hash;
- while(likely(fgets(line, PLUGINSD_LINE_MAX, fp) != NULL)) {
+ errno = 0;
+ clearerr(fp);
+
+ if(unlikely(fileno(fp) == -1)) {
+ error("PLUGINSD: %s: file is not a valid stream.", cd->fullfilename);
+ goto cleanup;
+ }
+
+ while(!ferror(fp)) {
+ if(unlikely(netdata_exit)) break;
+
+ char *r = fgets(line, PLUGINSD_LINE_MAX, fp);
+ if(unlikely(!r)) {
+ error("PLUGINSD: %s : read failed.", cd->fullfilename);
+ break;
+ }
+
if(unlikely(netdata_exit)) break;
line[PLUGINSD_LINE_MAX] = '\0';
}
}
+cleanup:
cd->enabled = enabled;
if(likely(count)) {
};
typedef struct rrddim RRDDIM;
+// ----------------------------------------------------------------------------
+// these loop macros make sure the linked list is accessed with the right lock
+
+#define rrddim_foreach_read(rd, st) \
+ for(rd = st->dimensions, rrdset_check_rdlock(st); rd ; rd = rd->next)
+
+#define rrddim_foreach_write(rd, st) \
+ for(rd = st->dimensions, rrdset_check_wrlock(st); rd ; rd = rd->next)
+
// ----------------------------------------------------------------------------
// RRDSET - this is a chart
char *cache_dir; // the directory to store dimensions
char cache_filename[FILENAME_MAX+1]; // the filename to store this set
- pthread_rwlock_t rrdset_rwlock;
+ pthread_rwlock_t rrdset_rwlock; // protects dimensions linked list
unsigned long counter; // the number of times we added values to this rrd
unsigned long counter_done; // the number of times we added values to this rrd
#define rrdset_wrlock(st) pthread_rwlock_wrlock(&((st)->rrdset_rwlock))
#define rrdset_unlock(st) pthread_rwlock_unlock(&((st)->rrdset_rwlock))
+// ----------------------------------------------------------------------------
+// these loop macros make sure the linked list is accessed with the right lock
+
+#define rrdset_foreach_read(st, host) \
+ for(st = host->rrdset_root, rrdhost_check_rdlock(host); st ; st = st->next)
+
+#define rrdset_foreach_write(st, host) \
+ for(st = host->rrdset_root, rrdhost_check_wrlock(host); st ; st = st->next)
+
+
// ----------------------------------------------------------------------------
// RRD HOST
RRDSET *rrdset_root; // the host charts
- pthread_rwlock_t rrdhost_rwlock; // lock for this RRDHOST
+ pthread_rwlock_t rrdhost_rwlock; // lock for this RRDHOST (protects rrdset_root linked list)
avl_tree_lock rrdset_root_index; // the host's charts index (by id)
avl_tree_lock rrdset_root_index_name; // the host's charts index (by name)
#define rrdhost_wrlock(h) pthread_rwlock_wrlock(&((h)->rrdhost_rwlock))
#define rrdhost_unlock(h) pthread_rwlock_unlock(&((h)->rrdhost_rwlock))
+// ----------------------------------------------------------------------------
+// these loop macros make sure the linked list is accessed with the right lock
+
+#define rrdhost_foreach_read(var) \
+ for(var = localhost, rrd_check_rdlock(); var ; var = var->next)
+
+#define rrdhost_foreach_write(var) \
+ for(var = localhost, rrd_check_wrlock(); var ; var = var->next)
+
+
+// ----------------------------------------------------------------------------
+// global lock for all RRDHOSTs
+
+extern pthread_rwlock_t rrd_rwlock;
+#define rrd_rdlock() pthread_rwlock_rdlock(&rrd_rwlock)
+#define rrd_wrlock() pthread_rwlock_wrlock(&rrd_rwlock)
+#define rrd_unlock() pthread_rwlock_unlock(&rrd_rwlock)
+
+// ----------------------------------------------------------------------------
+
extern void rrd_init(char *hostname);
extern RRDHOST *rrdhost_find(const char *guid, uint32_t hash);
extern RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid);
#ifdef NETDATA_INTERNAL_CHECKS
+extern void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line);
+extern void rrdhost_check_rdlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line);
+extern void rrdset_check_rdlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line);
+extern void rrdset_check_wrlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line);
+extern void rrd_check_rdlock_int(const char *file, const char *function, const unsigned long line);
+extern void rrd_check_wrlock_int(const char *file, const char *function, const unsigned long line);
+
#define rrdhost_check_rdlock(host) rrdhost_check_rdlock_int(host, __FILE__, __FUNCTION__, __LINE__)
#define rrdhost_check_wrlock(host) rrdhost_check_wrlock_int(host, __FILE__, __FUNCTION__, __LINE__)
+#define rrdset_check_rdlock(st) rrdset_check_rdlock_int(st, __FILE__, __FUNCTION__, __LINE__)
+#define rrdset_check_wrlock(st) rrdset_check_wrlock_int(st, __FILE__, __FUNCTION__, __LINE__)
#define rrd_check_rdlock() rrd_check_rdlock_int(__FILE__, __FUNCTION__, __LINE__)
#define rrd_check_wrlock() rrd_check_wrlock_int(__FILE__, __FUNCTION__, __LINE__)
+
#else
#define rrdhost_check_rdlock(host) (void)0
#define rrdhost_check_wrlock(host) (void)0
+#define rrdset_check_rdlock(host) (void)0
+#define rrdset_check_wrlock(host) (void)0
#define rrd_check_rdlock() (void)0
#define rrd_check_wrlock() (void)0
#endif
-extern void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line);
-extern void rrdhost_check_rdlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line);
-
-// ----------------------------------------------------------------------------
-// global lock for all RRDHOSTs
-
-extern pthread_rwlock_t rrd_rwlock;
-#define rrd_rdlock() pthread_rwlock_rdlock(&rrd_rwlock)
-#define rrd_wrlock() pthread_rwlock_wrlock(&rrd_rwlock)
-#define rrd_unlock() pthread_rwlock_unlock(&rrd_rwlock)
-
-
// ----------------------------------------------------------------------------
// RRDSET functions
size_t dimensions = 0;
RRDDIM *rd;
- for(rd = st->dimensions; rd ; rd = rd->next) {
+ rrddim_foreach_read(rd, st) {
if(rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) continue;
memory += rd->memsize;
, host->rrd_history_entries
);
+ c = 0;
rrdhost_rdlock(host);
- for(st = host->rrdset_root, c = 0; st ; st = st->next) {
+ rrdset_foreach_read(st, host) {
if(rrdset_flag_check(st, RRDSET_FLAG_ENABLED) && st->dimensions) {
if(c) buffer_strcat(wb, ",");
buffer_strcat(wb, "\n\t\t\"");
// for each chart
RRDSET *st;
- for(st = host->rrdset_root; st ; st = st->next) {
+ rrdset_foreach_read(st, host) {
char chart[PROMETHEUS_ELEMENT_MAX + 1];
prometheus_name_copy(chart, st->id, PROMETHEUS_ELEMENT_MAX);
// for each dimension
RRDDIM *rd;
- for(rd = st->dimensions; rd ; rd = rd->next) {
+ rrddim_foreach_read(rd, st) {
if(rd->counter) {
char dimension[PROMETHEUS_ELEMENT_MAX + 1];
prometheus_name_copy(dimension, rd->id, PROMETHEUS_ELEMENT_MAX);
// for each chart
RRDSET *st;
- for(st = host->rrdset_root; st ; st = st->next) {
+ rrdset_foreach_read(st, host) {
calculated_number total = 0.0;
char chart[SHELL_ELEMENT_MAX + 1];
shell_name_copy(chart, st->id, SHELL_ELEMENT_MAX);
// for each dimension
RRDDIM *rd;
- for(rd = st->dimensions; rd ; rd = rd->next) {
+ rrddim_foreach_read(rd, st) {
if(rd->counter) {
char dimension[SHELL_ELEMENT_MAX + 1];
shell_name_copy(dimension, rd->id, SHELL_ELEMENT_MAX);
unsigned long memory = st->memsize;
RRDDIM *rd;
- for(rd = st->dimensions; rd ; rd = rd->next) {
+ rrddim_foreach_read(rd, st) {
memory += rd->memsize;
void rrd_stats_all_json(RRDHOST *host, BUFFER *wb)
{
unsigned long memory = 0;
- long c;
+ long c = 0;
RRDSET *st;
buffer_strcat(wb, RRD_GRAPH_JSON_HEADER);
rrdhost_rdlock(host);
-
- for(st = host->rrdset_root, c = 0; st ; st = st->next) {
+ rrdset_foreach_read(st, host) {
if(rrdset_flag_check(st, RRDSET_FLAG_ENABLED) && st->dimensions) {
if(c) buffer_strcat(wb, ",\n");
memory += rrd_stats_one_json(st, NULL, wb);
c++;
}
}
+ rrdhost_unlock(host);
buffer_sprintf(wb, "\n\t],\n"
"\t\"hostname\": \"%s\",\n"
, host->rrd_history_entries
, memory
);
-
- rrdhost_unlock(host);
}
*/
void rrdr_disable_not_selected_dimensions(RRDR *r, uint32_t options, const char *dims) {
+ rrdset_check_rdlock(r->st);
+
if(unlikely(!dims || !*dims)) return;
char b[strlen(dims) + 1];
uint32_t rrdr_check_options(RRDR *r, uint32_t options, const char *dims)
{
+ rrdset_check_rdlock(r->st);
+
(void)dims;
if(options & RRDR_OPTION_NONZERO) {
void rrdr_json_wrapper_begin(RRDR *r, BUFFER *wb, uint32_t format, uint32_t options, int string_value)
{
+ rrdset_check_rdlock(r->st);
+
long rows = rrdr_rows(r);
long c, i;
RRDDIM *rd;
static void rrdr2json(RRDR *r, BUFFER *wb, uint32_t options, int datatable)
{
+ rrdset_check_rdlock(r->st);
+
//info("RRD2JSON(): %s: BEGIN", r->st->id);
int row_annotations = 0, dates, dates_with_new = 0;
char kq[2] = "", // key quote
static void rrdr2csv(RRDR *r, BUFFER *wb, uint32_t options, const char *startline, const char *separator, const char *endline, const char *betweenlines)
{
+ rrdset_check_rdlock(r->st);
+
//info("RRD2CSV(): %s: BEGIN", r->st->id);
long c, i;
RRDDIM *d;
}
inline static calculated_number rrdr2value(RRDR *r, long i, uint32_t options, int *all_values_are_null) {
+ rrdset_check_rdlock(r->st);
+
long c;
RRDDIM *d;
rrdr_lock_rrdset(r);
RRDDIM *rd;
- for(rd = st->dimensions ; rd ; rd = rd->next) r->d++;
+ rrddim_foreach_read(rd, st) r->d++;
r->n = n;
// initialize them
RRDDIM *rd;
long c;
+ rrdset_check_rdlock(st);
for( rd = st->dimensions, c = 0 ; rd && c < dimensions ; rd = rd->next, c++) {
last_values[c] = 0;
group_values[c] = (group_method == GROUP_MAX || group_method == GROUP_MIN)?NAN:0;
int dimensions = 0;
RRDDIM *rd;
- for( rd = st->dimensions ; rd ; rd = rd->next) dimensions++;
+ rrddim_foreach_read(rd, st) dimensions++;
if(!dimensions) {
rrdset_unlock(st);
buffer_strcat(wb, "No dimensions yet.");
// link it to its chart
RRDSET *st;
- for(st = host->rrdset_root; st ; st = st->next) {
+ rrdset_foreach_read(st, host) {
if(rrdcalc_is_matching_this_rrdset(rc, st)) {
rrdsetcalc_link(st, rc);
break;
// to ensure only one thread is saving the database
rrdhost_wrlock(host);
- for(st = host->rrdset_root; st ; st = st->next) {
+ rrdset_foreach_write(st, host) {
rrdset_rdlock(st);
if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
savememory(st->cache_filename, st, st->memsize);
}
- for(rd = st->dimensions; rd ; rd = rd->next) {
+ rrddim_foreach_read(rd, st) {
if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE)) {
debug(D_RRD_STATS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
savememory(rd->cache_filename, rd, rd->memsize);
rrd_rdlock();
RRDHOST *host;
- for(host = localhost; host ; host = host->next)
+ rrdhost_foreach_read(host)
rrdhost_save(host);
rrd_unlock();
static inline int need_to_send_chart_definitions(RRDSET *st) {
RRDDIM *rd;
- for(rd = st->dimensions; rd ;rd = rd->next)
+ rrddim_foreach_read(rd, st)
if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && !rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
return 1;
);
RRDDIM *rd;
- for(rd = st->dimensions; rd ;rd = rd->next) {
+ rrddim_foreach_read(rd, st) {
buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
, rd->id
, rd->name
, rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
, rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
);
+ rrddim_flag_set(rd, RRDDIM_FLAG_EXPOSED);
}
}
buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, st->usec_since_last_update);
RRDDIM *rd;
- for(rd = st->dimensions; rd ;rd = rd->next) {
+ rrddim_foreach_read(rd, st) {
if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))
buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
, rd->id
static void reset_all_charts(void) {
rrd_rdlock();
- RRDHOST *h;
- for(h = localhost; h ;h = h->next) {
+ RRDHOST *host;
+ rrdhost_foreach_read(host) {
+ rrdhost_rdlock(host);
+
RRDSET *st;
- for(st = h->rrdset_root ; st ; st = st->next) {
+ rrdset_foreach_read(st, host) {
rrdset_rdlock(st);
RRDDIM *rd;
- for(rd = st->dimensions; rd ;rd = rd->next)
+ rrddim_foreach_read(rd, st)
rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
rrdset_unlock(st);
}
+ rrdhost_unlock(host);
}
+ rrd_unlock();
last_host = NULL;
-
- rrd_unlock();
}
void rrdset_done_push(RRDSET *st) {
- if(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))
+ if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED) || !rrdpush_buffer))
return;
rrdpush_lock();
rrdset_rdlock(st);
- if(st->rrdhost != last_host)
- buffer_sprintf(rrdpush_buffer, "HOST '%s' '%s'\n", st->rrdhost->hostname, st->rrdhost->machine_guid);
+ if(st->rrdhost != last_host) {
+ buffer_sprintf(rrdpush_buffer, "HOST '%s' '%s'\n", st->rrdhost->machine_guid, st->rrdhost->hostname);
+ last_host = st->rrdhost;
+ }
if(need_to_send_chart_definitions(st))
send_chart_definitions(st);
size_t begin = 0;
size_t max_size = 1024 * 1024;
size_t reconnects_counter = 0;
+ size_t sent_bytes = 0;
+ size_t sent_connection = 0;
int sock = -1;
char buffer[1];
for(;;) {
if(unlikely(sock == -1)) {
+ info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data);
sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
if(unlikely(sock != -1)) {
- if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
- error("Cannot set non-blocking mode for socket.");
+ info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
- buffer_sprintf(rrdpush_buffer, "GET /stream?key=%s\r\n\r\n", config_get("global", "central netdata api key", ""));
- reset_all_charts();
+ if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
+ error("PUSH: cannot set non-blocking mode for socket.");
}
+ else
+ error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+
+ rrdpush_lock();
+ if(buffer_strlen(rrdpush_buffer))
+ error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
+
+ buffer_flush(rrdpush_buffer);
+ buffer_sprintf(rrdpush_buffer, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), VERSION);
+ reset_all_charts();
+ rrdpush_unlock();
+ sent_connection = 0;
}
if(read(rrdpush_pipe[PIPE_READ], buffer, 1) == -1) {
- error("Cannot read from internal pipe.");
+ error("PUSH: Cannot read from internal pipe.");
sleep(1);
}
- if(likely(sock != -1)) {
+ if(likely(sock != -1 && begin < rrdpush_buffer->len)) {
+ // fprintf(stderr, "PUSH BEGIN\n");
+ // fwrite(&rrdpush_buffer->buffer[begin], 1, rrdpush_buffer->len - begin, stderr);
+ // fprintf(stderr, "\nPUSH END\n");
+
rrdpush_lock();
- ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], rrdpush_buffer->len, MSG_DONTWAIT);
+ ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], rrdpush_buffer->len - begin, MSG_DONTWAIT);
if(ret == -1) {
- error("Failed to send metrics to central netdata at %s", central_netdata_to_push_data);
- close(sock);
- sock = -1;
+ if(errno != EAGAIN) {
+ error("PUSH: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
+ close(sock);
+ sock = -1;
+ }
}
else {
+ sent_connection += ret;
+ sent_bytes += ret;
begin += ret;
if(begin == rrdpush_buffer->len) {
buffer_flush(rrdpush_buffer);
// protection from overflow
if(rrdpush_buffer->len > max_size) {
- rrdpush_lock();
-
- error("Discarding %zu bytes of metrics data, because we cannot connect to central netdata at %s"
- , buffer_strlen(rrdpush_buffer), central_netdata_to_push_data);
-
- buffer_flush(rrdpush_buffer);
-
+ errno = 0;
+ error("PUSH: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
if(sock != -1) {
close(sock);
sock = -1;
}
-
- rrdpush_unlock();
}
}
-cleanup:
debug(D_WEB_CLIENT, "Central netdata push thread exits.");
if(sock != -1)
close(sock);
#define RRD_DEFAULT_GAP_INTERPOLATIONS 1
+void rrdset_check_rdlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line) {
+ debug(D_RRD_CALLS, "Checking read lock on chart '%s'", st->id);
+
+ int ret = pthread_rwlock_trywrlock(&st->rrdset_rwlock);
+ if(ret == 0)
+ fatal("RRDSET '%s' should be read-locked, but it is not, at function %s() at line %lu of file '%s'", st->id, function, line, file);
+}
+
+void rrdset_check_wrlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line) {
+ debug(D_RRD_CALLS, "Checking write lock on chart '%s'", st->id);
+
+ int ret = pthread_rwlock_tryrdlock(&st->rrdset_rwlock);
+ if(ret == 0)
+ fatal("RRDSET '%s' should be write-locked, but it is not, at function %s() at line %lu of file '%s'", st->id, function, line, file);
+}
+
+
// ----------------------------------------------------------------------------
// RRDSET index
rrdset_wrlock(st);
RRDDIM *rd;
- for(rd = st->dimensions; rd ;rd = rd->next)
+ rrddim_foreach_write(rd, st)
rrddimvar_rename_all(rd);
rrdset_unlock(st);
st->counter_done = 0;
RRDDIM *rd;
- for(rd = st->dimensions; rd ; rd = rd->next) {
+ rrddim_foreach_read(rd, st) {
rd->last_collected_time.tv_sec = 0;
rd->last_collected_time.tv_usec = 0;
rd->counter = 0;
st->counter_done++;
// calculate totals and count the dimensions
- int dimensions;
+ int dimensions = 0;
st->collected_total = 0;
- for( rd = st->dimensions, dimensions = 0 ; rd ; rd = rd->next, dimensions++ )
+ rrddim_foreach_read(rd, st) {
+ dimensions++;
if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
st->collected_total += rd->collected_value;
+ }
uint32_t storage_flags = SN_EXISTS;
// process all dimensions to calculate their values
// based on the collected figures only
// at this stage we do not interpolate anything
- for( rd = st->dimensions ; rd ; rd = rd->next ) {
+ rrddim_foreach_read(rd, st) {
if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))) {
rd->calculated_value = 0;
st->last_updated.tv_sec = (time_t) (next_store_ut / USEC_PER_SEC);
st->last_updated.tv_usec = 0;
- for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
+ rrddim_foreach_read(rd, st) {
calculated_number new_value;
switch(rd->algorithm) {
st->last_collected_total = st->collected_total;
- for( rd = st->dimensions; rd ; rd = rd->next ) {
+ rrddim_foreach_read(rd, st) {
if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
continue;
}
int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
- info("STREAM request from client '%s:%s' for host '%s'", w->client_ip, w->client_port, host->hostname);
+ info("STREAM request from client '%s:%s', starting as host '%s'", w->client_ip, w->client_port, host->hostname);
char *key = NULL;
}
if(!key || !*key) {
+ error("STREAM request from client '%s:%s', without an API key. Forbidding access.", w->client_ip, w->client_port);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "You need an API key for this request.");
- error("STREAM request from client '%s:%s', without an API key. Forbidding access.", w->client_ip, w->client_port);
return 401;
}
if(!validate_stream_api_key(key)) {
+ error("STREAM request from client '%s:%s': API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "Your API key is not permitted access.");
- error("STREAM request from client '%s:%s': API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
return 401;
}
}
// call the plugins.d processor to receive the metrics
+ info("STREAM connecting client '%s:%s' to plugins.d.", w->client_ip, w->client_port);
size_t count = pluginsd_process(host, &cd, fp, 1);
error("STREAM from '%s:%s': client disconnected.", w->client_ip, w->client_port);
if(unlikely(hash == hash_localhost && !strcmp(tok, "localhost")))
return web_client_process_url(localhost, w, url);
+ rrd_rdlock();
RRDHOST *h;
- for(h = localhost; h; h = h->next) {
+ rrdhost_foreach_read(h) {
if(unlikely((hash == h->hash_hostname && !strcmp(tok, h->hostname)) ||
- (hash == h->hash_machine_guid && !strcmp(tok, h->machine_guid))))
+ (hash == h->hash_machine_guid && !strcmp(tok, h->machine_guid)))) {
+ rrd_unlock();
return web_client_process_url(h, w, url);
+ }
}
+ rrd_unlock();
}
buffer_flush(w->response.data);
debug(D_WEB_CLIENT_ACCESS, "%llu: Sending list of RRD_STATS...", w->id);
buffer_flush(w->response.data);
- RRDSET *st = host->rrdset_root;
+ RRDSET *st;
- for ( ; st ; st = st->next )
- buffer_sprintf(w->response.data, "%s\n", st->name);
+ rrdhost_rdlock(host);
+ rrdset_foreach_read(st, host) buffer_sprintf(w->response.data, "%s\n", st->name);
+ rrdhost_unlock(host);
return 200;
}