appconfig_wrlock(root);
for(co = root->sections; co ; co = co->next) {
- if(!strcmp(co->name, "global") ||
- !strcmp(co->name, "plugins") ||
- !strcmp(co->name, "registry") ||
- !strcmp(co->name, "health") ||
- !strcmp(co->name, "backend"))
+ if(!strcmp(co->name, "global")
+ || !strcmp(co->name, "plugins")
+ || !strcmp(co->name, "registry")
+ || !strcmp(co->name, "health")
+ || !strcmp(co->name, "backend")
+ || !strcmp(co->name, "stream")
+ )
pri = 0;
else if(!strncmp(co->name, "plugin:", 7)) pri = 1;
else pri = 2;
// ------------------------------------------------------------------------
// collect configuration options
- if(central_netdata_to_push_data) {
+ if(rrdpush_exclusive) {
info("Backend is disabled - use the central netdata");
goto cleanup;
}
usec_t start_ut = now_monotonic_usec();
size_t reconnects = 0;
- sock = connect_to_one_of(destination, default_port, &timeout, &reconnects);
+ sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
chart_backend_reconnects += reconnects;
chart_backend_latency += now_monotonic_usec() - start_ut;
extern char *netdata_configured_varlib_dir;
extern char *netdata_configured_home_dir;
extern char *netdata_configured_host_prefix;
-extern char *central_netdata_to_push_data;
extern void netdata_fix_chart_id(char *s);
extern void netdata_fix_chart_name(char *s);
void health_init(void) {
debug(D_HEALTH, "Health configuration initializing");
- if(!central_netdata_to_push_data) {
+ if(!rrdpush_exclusive) {
if(!(default_health_enabled = config_get_boolean("health", "enabled", 1))) {
debug(D_HEALTH, "Health is disabled.");
return;
extern void *cgroups_main(void *ptr);
-char *central_netdata_to_push_data = NULL;
-
void netdata_cleanup_and_exit(int ret) {
netdata_exit = 1;
void web_server_threading_selection(void) {
int multi_threaded = 0;
int single_threaded = 0;
- int central_thread = 0;
+ int rrdpush_thread = 1;
+ int backends_thread = 1;
- if(!central_netdata_to_push_data) {
- multi_threaded = config_get_boolean("global", "multi threaded web server", 1);
- single_threaded = !multi_threaded;
+ if(rrdpush_exclusive) {
+ backends_thread = 0;
+ info("Web servers and backends thread are disabled - use the central netdata.");
}
else {
- central_thread = 1;
- info("Web servers are disabled - use the central netdata.");
+ multi_threaded = config_get_boolean("global", "multi threaded web server", 1);
+ single_threaded = !multi_threaded;
}
int i;
static_threads[i].enabled = single_threaded;
if(static_threads[i].start_routine == central_netdata_push_thread)
- static_threads[i].enabled = central_thread;
+ static_threads[i].enabled = rrdpush_thread;
+
+ if(static_threads[i].start_routine == backends_main)
+ static_threads[i].enabled = backends_thread;
}
- if(central_netdata_to_push_data)
+ if(rrdpush_exclusive)
return;
web_client_timeout = (int) config_get_number("global", "disconnect idle web clients after seconds", DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS);
// --------------------------------------------------------------------
// find we need to send data to a central netdata
- central_netdata_to_push_data = config_get("global", "central netdata to send all data", "");
- if(central_netdata_to_push_data && !*central_netdata_to_push_data)
- central_netdata_to_push_data = NULL;
+ rrdpush_init();
// --------------------------------------------------------------------
// get default memory mode for the database
- if(central_netdata_to_push_data) {
+ if(rrdpush_exclusive) {
default_rrd_memory_mode = RRD_MEMORY_MODE_RAM;
config_set("global", "memory mode", rrd_memory_mode_name(default_rrd_memory_mode));
}
// --------------------------------------------------------------------
// get default database size
- if(central_netdata_to_push_data) {
+ if(rrdpush_exclusive) {
default_rrd_history_entries = 10;
config_set_number("global", "history", default_rrd_history_entries);
}
// --------------------------------------------------------------------
// create the listening sockets
- if(!check_config && !central_netdata_to_push_data) {
+ if(!check_config && !rrdpush_exclusive) {
char filename[FILENAME_MAX + 1];
snprintfz(filename, FILENAME_MAX, "%s/aggregated_hosts.conf", netdata_configured_config_dir);
appconfig_load(&stream_config, filename, 0);
char filename[FILENAME_MAX + 1];
// registry enabled?
- if(!central_netdata_to_push_data) {
+ if(!rrdpush_exclusive) {
registry.enabled = config_get_boolean("registry", "enabled", 0);
}
else {
// are created or renamed, that match them
RRDCALCTEMPLATE *templates;
+ char *os; // the O/S type of the host
+ volatile size_t use_counter; // when remote hosts are streaming to this
+ // host, this is the counter of connected clients
+
// health / alarm settings
char *health_default_exec;
char *health_default_recipient;
extern void rrd_init(char *hostname);
extern RRDHOST *rrdhost_find(const char *guid, uint32_t hash);
-extern RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled);
+extern RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, const char *os, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled);
#ifdef NETDATA_INTERNAL_CHECKS
extern void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line);
",\n\t\"charts\": {"
, host->hostname
, program_version
- , os_type
+ , host->os
, host->rrd_update_every
, host->rrd_history_entries
);
host->hash_hostname = simple_hash(host->hostname);
}
+static inline void rrdhost_init_os(RRDHOST *host, const char *os) {
+ freez(host->os);
+ host->os = strdupz(os?os:"unknown");
+}
+
static inline void rrdhost_init_machine_guid(RRDHOST *host, const char *machine_guid) {
strncpy(host->machine_guid, machine_guid, GUID_LEN);
host->machine_guid[GUID_LEN] = '\0';
RRDHOST *rrdhost_create(const char *hostname,
const char *guid,
+ const char *os,
int update_every,
int entries,
RRD_MEMORY_MODE memory_mode,
rrdhost_init_hostname(host, hostname);
rrdhost_init_machine_guid(host, guid);
+ rrdhost_init_os(host, os);
avl_init_lock(&(host->rrdset_root_index), rrdset_compare);
avl_init_lock(&(host->rrdset_root_index_name), rrdset_compare_name);
return host;
}
-RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled) {
+RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, const char *os, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled) {
debug(D_RRDHOST, "Searching for host '%s' with guid '%s'", hostname, guid);
RRDHOST *host = rrdhost_find(guid, 0);
if(!host) {
- host = rrdhost_create(hostname, guid, update_every, history, mode, health_enabled);
+ host = rrdhost_create(hostname, guid, os, update_every, history, mode, health_enabled);
}
else {
host->health_enabled = health_enabled;
localhost = rrdhost_create(hostname,
registry_get_this_machine_guid(),
+ os_type,
default_rrd_update_every,
default_rrd_history_entries,
default_rrd_memory_mode,
// ------------------------------------------------------------------------
// free it
+ freez(host->os);
freez(host->cache_dir);
freez(host->varlib_dir);
freez(host->health_default_exec);
#include "common.h"
+int rrdpush_enabled = 0;
+int rrdpush_exclusive = 1;
+
+static char *central_netdata = NULL;
+static char *api_key = NULL;
+
+#define CONNECTED_TO_SIZE 100
+
+// data collection happens from multiple threads
+// each of these threads calls rrdset_done()
+// which in turn calls rrdset_done_push()
+// which uses this pipe to notify the streaming thread
+// that there are more data ready to be sent
#define PIPE_READ 0
#define PIPE_WRITE 1
+int rrdpush_pipe[2] = { -1, -1 };
-int rrdpush_pipe[2];
-
+// a buffer used to store data to be sent.
+// the format is the same as external plugins.
static BUFFER *rrdpush_buffer = NULL;
+
+// locking to get exclusive access to shared resources
+// (rrdpush_pipe[PIPE_WRITE], rrdpush_buffer
static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+// if the streaming thread is connected to a central netdata
+// this is set to 1, otherwise 0.
static volatile int rrdpush_connected = 0;
-static inline void rrdpush_lock() {
- pthread_mutex_lock(&rrdpush_mutex);
-}
+// to have the remote netdata re-sync the charts
+// to its current clock, we send for this many
+// iterations a BEGIN line without microseconds
+// this is for the first iterations of each chart
+static unsigned int remote_clock_resync_iterations = 60;
-static inline void rrdpush_unlock() {
- pthread_mutex_unlock(&rrdpush_mutex);
-}
+#define rrdpush_lock() pthread_mutex_lock(&rrdpush_mutex)
+#define rrdpush_unlock() pthread_mutex_unlock(&rrdpush_mutex)
+// checks if the current chart definition has been sent
static inline int need_to_send_chart_definition(RRDSET *st) {
RRDDIM *rd;
rrddim_foreach_read(rd, st)
if(!rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
return 1;
-
- // fprintf(stderr, "NOT Sending CHART '%s' '%s'\n", st->id, st->name);
return 0;
}
+// sends the current chart definition
static inline void send_chart_definition(RRDSET *st) {
- // fprintf(stderr, "Sending CHART '%s' '%s'\n", st->id, st->name);
-
buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
, st->id
, st->name
}
}
+// sends the current chart dimensions
static inline void send_chart_metrics(RRDSET *st) {
- buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > 60)?st->usec_since_last_update:0);
+ buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > remote_clock_resync_iterations)?st->usec_since_last_update:0);
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
buffer_strcat(rrdpush_buffer, "END\n");
}
+// resets all the chart, so that their definitions
+// will be resent to the central netdata
static void reset_all_charts(void) {
rrd_rdlock();
if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
return;
-
rrdpush_lock();
if(unlikely(!rrdpush_buffer || !rrdpush_connected)) {
rrdpush_unlock();
}
+int rrdpush_init() {
+ rrdpush_enabled = config_get_boolean("stream", "enabled", rrdpush_enabled);
+ rrdpush_exclusive = config_get_boolean("stream", "exclusive", rrdpush_exclusive);
+ central_netdata = config_get("stream", "stream metrics to", "");
+ api_key = config_get("stream", "api key", "");
+
+ if(!rrdpush_enabled || !central_netdata || !*central_netdata || !api_key || !*api_key) {
+ rrdpush_enabled = 0;
+ rrdpush_exclusive = 0;
+ }
+
+ return rrdpush_enabled;
+}
+
void *central_netdata_push_thread(void *ptr) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
error("STREAM: cannot set pthread cancel state to ENABLE.");
+ int timeout = (int)config_get_number("stream", "timeout seconds", 60);
+ int default_port = (int)config_get_number("stream", "default port", 19999);
+ size_t max_size = (size_t)config_get_number("stream", "buffer size bytes", 1024 * 1024);
+ unsigned int reconnect_delay = (unsigned int)config_get_number("stream", "reconnect delay seconds", 5);
+ remote_clock_resync_iterations = (unsigned int)config_get_number("stream", "initial clock resync iterations", remote_clock_resync_iterations);
+ int sock = -1;
- rrdpush_buffer = buffer_create(1);
-
- if(pipe(rrdpush_pipe) == -1)
- fatal("STREAM: cannot create required pipe.");
-
- struct timeval tv = {
- .tv_sec = 60,
- .tv_usec = 0
- };
+ if(!rrdpush_enabled || !central_netdata || !*central_netdata || !api_key || !*api_key)
+ goto cleanup;
+ // initialize rrdpush globals
+ rrdpush_buffer = buffer_create(1);
rrdpush_connected = 0;
+ if(pipe(rrdpush_pipe) == -1) fatal("STREAM: cannot create required pipe.");
+
+ // initialize local variables
size_t begin = 0;
- size_t max_size = 1024 * 1024;
size_t reconnects_counter = 0;
size_t sent_bytes = 0;
size_t sent_connection = 0;
- int sock = -1;
+
+ struct timeval tv = {
+ .tv_sec = timeout,
+ .tv_usec = 0
+ };
struct pollfd fds[2], *ifd, *ofd;
nfds_t fdmax;
ifd = &fds[0];
ofd = &fds[1];
+ char connected_to[CONNECTED_TO_SIZE + 1];
+
for(;;) {
if(netdata_exit) break;
// they will be lost, so there is no point to do it
rrdpush_connected = 0;
- info("STREAM: connecting to central netdata at: %s", central_netdata_to_push_data);
- sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
+ info("STREAM: connecting to central netdata at: %s", central_netdata);
+ sock = connect_to_one_of(central_netdata, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE);
if(unlikely(sock == -1)) {
- error("STREAM: failed to connect to central netdata at: %s", central_netdata_to_push_data);
- sleep(5);
+ error("STREAM: failed to connect to central netdata at: %s", central_netdata);
+ sleep(reconnect_delay);
continue;
}
- info("STREAM: initializing communication to central netdata at: %s", central_netdata_to_push_data);
+ info("STREAM: initializing communication to central netdata at: %s", connected_to);
char http[1000 + 1];
- snprintfz(http, 1000, "GET /stream?key=%s&hostname=%s&machine_guid=%s&update_every=%d HTTP/1.1\r\n"
+ snprintfz(http, 1000,
+ "STREAM key=%s&hostname=%s&machine_guid=%s&os=%s&update_every=%d HTTP/1.1\r\n"
"User-Agent: netdata-push-service/%s\r\n"
"Accept: */*\r\n\r\n"
- , config_get("global", "central netdata api key", "")
+ , api_key
, localhost->hostname
, localhost->machine_guid
+ , localhost->os
, default_rrd_update_every
, program_version
);
- if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
+ if(send_timeout(sock, http, strlen(http), 0, timeout) == -1) {
close(sock);
sock = -1;
- error("STREAM: failed to send http header to netdata at: %s", central_netdata_to_push_data);
- sleep(5);
+ error("STREAM: failed to send http header to netdata at: %s", connected_to);
+ sleep(reconnect_delay);
continue;
}
- info("STREAM: Waiting for STREAM from central netdata at: %s", central_netdata_to_push_data);
+ info("STREAM: Waiting for STREAM from central netdata at: %s", connected_to);
- if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
+ if(recv_timeout(sock, http, 1000, 0, timeout) == -1) {
close(sock);
sock = -1;
- error("STREAM: failed to receive STREAM from netdata at: %s", central_netdata_to_push_data);
- sleep(5);
+ error("STREAM: failed to receive STREAM from netdata at: %s", connected_to);
+ sleep(reconnect_delay);
continue;
}
if(strncmp(http, "STREAM", 6)) {
close(sock);
sock = -1;
- error("STREAM: netdata servers at %s, did not send STREAM", central_netdata_to_push_data);
- sleep(5);
+ error("STREAM: server at %s, did not send STREAM", connected_to);
+ sleep(reconnect_delay);
continue;
}
- info("STREAM: Established STREAM with central netdata at: %s - sending metrics...", central_netdata_to_push_data);
+ info("STREAM: Established communication with central netdata at: %s - sending metrics...", connected_to);
if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
error("STREAM: cannot set non-blocking mode for socket.");
}
if(netdata_exit) break;
- int retval = poll(fds, fdmax, 60 * 1000);
+ int retval = poll(fds, fdmax, timeout * 1000);
if(netdata_exit) break;
if(unlikely(retval == -1)) {
ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
if(ret == -1) {
if(errno != EAGAIN && errno != EINTR) {
- error("STREAM: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
+ error("STREAM: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", connected_to, sent_connection);
close(sock);
sock = -1;
}
}
}
+cleanup:
debug(D_WEB_CLIENT, "STREAM: central netdata push thread exits.");
- if(sock != -1) {
- close(sock);
- }
+
+ // make sure the data collection threads do not write data
+ rrdpush_connected = 0;
+
+ // close the pipe
+ if(rrdpush_pipe[PIPE_READ] != -1) close(rrdpush_pipe[PIPE_READ]);
+ if(rrdpush_pipe[PIPE_WRITE] != -1) close(rrdpush_pipe[PIPE_WRITE]);
+
+ // close the socket
+ if(sock != -1) close(sock);
+
+ rrdpush_lock();
+ buffer_free(rrdpush_buffer);
+ rrdpush_buffer = NULL;
+ rrdpush_unlock();
static_thread->enabled = 0;
pthread_exit(NULL);
#ifndef NETDATA_RRDPUSH_H
#define NETDATA_RRDPUSH_H
+extern int rrdpush_enabled;
+extern int rrdpush_exclusive;
+
+extern int rrdpush_init();
extern void rrdset_done_push(RRDSET *st);
extern void *central_netdata_push_thread(void *ptr);
// RRDSET - helpers for rrdset_create()
inline long align_entries_to_pagesize(long entries) {
- if(central_netdata_to_push_data)
+ if(rrdpush_exclusive)
return entries;
if(entries < 5) entries = 5;
rrdset_update_last_collected_time(st);
}
- rrdset_done_push(st);
-
st->counter++;
st->counter_done++;
+
+ rrdset_done_push(st);
}
void rrdset_done(RRDSET *st) {
if(unlikely(netdata_exit)) return;
- if(unlikely(central_netdata_to_push_data)) {
+ if(unlikely(rrdpush_exclusive)) {
rrdset_done_push_int(st);
return;
}
if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
+
+ if(unlikely(rrdpush_enabled))
+ rrdset_done_push_int(st);
}
return fd;
}
-int connect_to_one_of(const char *destination, int default_port, struct timeval *timeout, size_t *reconnects_counter) {
+int connect_to_one_of(const char *destination, int default_port, struct timeval *timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size) {
int sock = -1;
const char *s = destination;
strncpyz(buf, s, e - s);
if(reconnects_counter) *reconnects_counter += 1;
sock = connect_to(buf, default_port, timeout);
- if(sock != -1) break;
+ if(sock != -1) {
+ if(connected_to && connected_to_size) {
+ strncpy(connected_to, buf, connected_to_size);
+ connected_to[connected_to_size - 1] = '\0';
+ }
+ break;
+ }
s = e;
}
#define NETDATA_SOCKET_H
extern int connect_to(const char *definition, int default_port, struct timeval *timeout);
-extern int connect_to_one_of(const char *destination, int default_port, struct timeval *timeout, size_t *reconnects_counter);
+extern int connect_to_one_of(const char *destination, int default_port, struct timeval *timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size);
extern ssize_t recv_timeout(int sockfd, void *buf, size_t len, int flags, int timeout);
extern ssize_t send_timeout(int sockfd, void *buf, size_t len, int flags, int timeout);
return(b);
}
-void buffer_free(BUFFER *b)
-{
+void buffer_free(BUFFER *b) {
+ if(unlikely(!b)) return;
+
buffer_overflow_check(b);
debug(D_WEB_BUFFER, "Freeing web buffer of size %zu.", b->size);
return 0;
}
-struct web_client *web_client_create(int listener)
-{
+struct web_client *web_client_create(int listener) {
struct web_client *w;
w = callocz(1, sizeof(struct web_client));
if(w->prev) w->prev->next = w->next;
if(w->next) w->next->prev = w->prev;
- if(w->response.header_output) buffer_free(w->response.header_output);
- if(w->response.header) buffer_free(w->response.header);
- if(w->response.data) buffer_free(w->response.data);
+ buffer_free(w->response.header_output);
+ buffer_free(w->response.header);
+ buffer_free(w->response.data);
if(w->ifd != -1) close(w->ifd);
if(w->ofd != -1 && w->ofd != w->ifd) close(w->ofd);
freez(w);
}
cleanup:
- if(dimensions)
- buffer_free(dimensions);
+ buffer_free(dimensions);
return ret;
}
buffer_strcat(w->response.data, ");");
cleanup:
- if(dimensions) buffer_free(dimensions);
+ buffer_free(dimensions);
return ret;
}
}
int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
- char *key = NULL, *hostname = NULL, *machine_guid = NULL;
+ char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = NULL;
int update_every = default_rrd_update_every;
int history = default_rrd_history_entries;
RRD_MEMORY_MODE mode = default_rrd_memory_mode;
machine_guid = value;
else if(!strcmp(name, "update_every"))
update_every = (int)strtoul(value, NULL, 0);
+ else if(!strcmp(name, "os"))
+ os = value;
}
if(!key || !*key) {
return 404;
}
- // update_every = (int)appconfig_get_number(&stream_config, key, "default update every", update_every);
update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
if(update_every < 0) update_every = 1;
health_enabled = appconfig_get_boolean_ondemand(&stream_config, key, "health enabled by default", health_enabled);
health_enabled = appconfig_get_boolean_ondemand(&stream_config, machine_guid, "health enabled", health_enabled);
- host = rrdhost_find_or_create(hostname, machine_guid, update_every, history, mode, health_enabled?1:0);
+ if(strcmp(machine_guid, "localhost"))
+ host = localhost;
+ else
+ host = rrdhost_find_or_create(hostname, machine_guid, os, update_every, history, mode, health_enabled?1:0);
info("STREAM request from client '%s:%s' for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s",
w->client_ip, w->client_port,
return 500;
}
+ rrdhost_wrlock(host);
+ host->use_counter++;
+ rrdhost_unlock(host);
+
// call the plugins.d processor to receive the metrics
info("STREAM [%s]:%s: connecting client to plugins.d on host '%s' with machine GUID '%s'.", w->client_ip, w->client_port, host->hostname, host->machine_guid);
size_t count = pluginsd_process(host, &cd, fp, 1);
error("STREAM [%s]:%s: client disconnected (host '%s', machine GUID '%s').", w->client_ip, w->client_port, host->hostname, host->machine_guid);
- if(health_enabled == CONFIG_BOOLEAN_AUTO)
+
+ rrdhost_wrlock(host);
+ host->use_counter--;
+ if(!host->use_counter && health_enabled == CONFIG_BOOLEAN_AUTO)
host->health_enabled = 0;
+ rrdhost_unlock(host);
- // close all sockets, to let the socket worker we are done
+ // cleanup
fclose(fp);
w->ifd = -1;
- if(w->ofd != -1 && w->ofd != w->ifd) {
- close(w->ofd);
- w->ofd = -1;
- }
// this will not send anything
// the socket is closed
encoded_url = s = &s[8];
w->mode = WEB_CLIENT_MODE_OPTIONS;
}
+ else if(!strncmp(s, "STREAM ", 8)) {
+ encoded_url = s = &s[8];
+ w->mode = WEB_CLIENT_MODE_STREAM;
+ }
else {
w->wait_receive = 0;
return HTTP_VALIDATION_NOT_SUPPORTED;
hash_graph = 0,
hash_list = 0,
hash_all_json = 0,
- hash_host = 0,
- hash_stream = 0;
+ hash_host = 0;
#ifdef NETDATA_INTERNAL_CHECKS
static uint32_t hash_exit = 0, hash_debug = 0, hash_mirror = 0;
hash_list = simple_hash("list");
hash_all_json = simple_hash("all.json");
hash_host = simple_hash("host");
- hash_stream = simple_hash("stream");
#ifdef NETDATA_INTERNAL_CHECKS
hash_exit = simple_hash("exit");
hash_debug = simple_hash("debug");
debug(D_WEB_CLIENT_ACCESS, "%llu: host switch request ...", w->id);
return web_client_switch_host(host, w, url);
}
- else if(unlikely(hash == hash_stream && strcmp(tok, "stream") == 0)) {
- debug(D_WEB_CLIENT_ACCESS, "%llu: stream request ...", w->id);
- return web_client_stream_request(host, w, url);
- }
else if(unlikely(hash == hash_netdata_conf && strcmp(tok, "netdata.conf") == 0)) {
debug(D_WEB_CLIENT_ACCESS, "%llu: Sending netdata.conf ...", w->id);
w->response.data->contenttype = CT_TEXT_PLAIN;
buffer_strcat(w->response.data, "OK");
w->response.code = 200;
}
+ else if(unlikely(w->mode == WEB_CLIENT_MODE_STREAM)) {
+ w->response.code = web_client_stream_request(localhost, w, w->decoded_url);
+ return;
+ }
else
w->response.code = web_client_process_url(localhost, w, w->decoded_url);
break;
else w->wait_send = 0;
switch(w->mode) {
+ case WEB_CLIENT_MODE_STREAM:
+ debug(D_WEB_CLIENT, "%llu: STREAM done.", w->id);
+ break;
+
case WEB_CLIENT_MODE_OPTIONS:
debug(D_WEB_CLIENT, "%llu: Done preparing the OPTIONS response. Sending data (%zu bytes) to client.", w->id, w->response.data->len);
break;
break;
default:
- fatal("%llu: Unknown client mode %d.", w->id, w->mode);
+ fatal("%llu: Unknown client mode %u.", w->id, w->mode);
break;
}
}
// if the sockets are closed, may have transferred this client
// to plugins.d
- if(w->ifd == -1 && w->ofd == -1) break;
+ if(unlikely(w->mode == WEB_CLIENT_MODE_STREAM))
+ break;
}
}
extern int respect_web_browser_do_not_track_policy;
extern char *web_x_frame_options;
-#define WEB_CLIENT_MODE_NORMAL 0
-#define WEB_CLIENT_MODE_FILECOPY 1
-#define WEB_CLIENT_MODE_OPTIONS 2
+typedef enum web_client_mode {
+ WEB_CLIENT_MODE_NORMAL = 0,
+ WEB_CLIENT_MODE_FILECOPY = 1,
+ WEB_CLIENT_MODE_OPTIONS = 2,
+ WEB_CLIENT_MODE_STREAM = 3
+} WEB_CLIENT_MODE;
#define URL_MAX 8192
#define ZLIB_CHUNK 16384
uint8_t keepalive:1; // if set to 1, the web client will be re-used
- uint8_t mode:3; // the operational mode of the client
-
uint8_t wait_receive:1; // 1 = we are waiting more input data
uint8_t wait_send:1; // 1 = we have data to send to the client
uint8_t donottrack:1; // 1 = we should not set cookies on this client
uint8_t tracking_required:1; // 1 = if the request requires cookies
+ WEB_CLIENT_MODE mode; // the operational mode of the client
+
int tcp_cork; // 1 = we have a cork on the socket
int ifd;