]> arthur.barton.de Git - netdata.git/commitdiff
moved [stream] configuration to stream.conf
authorCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Sat, 25 Feb 2017 10:46:12 +0000 (12:46 +0200)
committerCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Sat, 25 Feb 2017 10:46:12 +0000 (12:46 +0200)
13 files changed:
conf.d/Makefile.am
conf.d/aggregated_hosts.conf [deleted file]
conf.d/stream.conf [new file with mode: 0644]
src/main.c
src/registry.h
src/registry_internals.c
src/registry_internals.h
src/registry_machine.c
src/registry_person.c
src/rrd.h
src/rrdhost.c
src/rrdpush.c
src/rrdpush.h

index dd6c2cbf9e2a0272cefbb873058c67c105c73943..743ab19e93cf041e7df894b712e3b22f1554e9a4 100644 (file)
@@ -4,7 +4,6 @@
 MAINTAINERCLEANFILES= $(srcdir)/Makefile.in
 
 dist_config_DATA = \
-    aggregated_hosts.conf \
     apps_groups.conf \
     charts.d.conf \
     fping.conf \
@@ -12,6 +11,7 @@ dist_config_DATA = \
     python.d.conf \
     health_alarm_notify.conf \
     health_email_recipients.conf \
+    stream.conf \
     $(NULL)
 
 nodeconfigdir=$(configdir)/node.d
diff --git a/conf.d/aggregated_hosts.conf b/conf.d/aggregated_hosts.conf
deleted file mode 100644 (file)
index d192a55..0000000
+++ /dev/null
@@ -1,97 +0,0 @@
-# netdata configuration for aggregating data from remote hosts
-#
-# API keys authorize a pair of sending-receiving netdata servers.
-# Once their communication is authorized, they can exchange metrics for any
-# number of hosts.
-#
-# You can generate API keys, with the linux command: uuidgen
-#
-# -----------------------------------------------------------------------------
-# 1. SLAVE NETDATA - THE ONE THAT WILL BE SENDING METRICS
-#
-#    In /etc/netdata/netdata.conf you have a configuration like this:
-#
-#    [stream]
-#
-#        # enable or disable sending metrics.
-#        # default: no
-#           enabled = yes
-#
-#        # one or more netdata hosts to send metrics to.
-#        # only one will get them, the first available.
-#           destination = HOST1:PORT1 HOST2:PORT2 ...
-#
-#        # the API key to use, to authorize ourselves
-#           api key =  API_KEY
-#
-# -----------------------------------------------------------------------------
-# 2. MASTER NETDATA - THE ONE THAT WILL BE RECEIVING METRICS
-#
-#    Use this file to define the API keys is accepts.
-#
-#    You can have one API key per slave, or the same API key for all slaves.
-#
-#    All options below are used in this order:
-#
-#    a) MACHINE_GUID (settings for each machine)
-#    b) API_KEY      (settings for the API key)
-#    c) this netdata defaults (as in netdata.conf)
-#
-#    You can combine the above (the more specific setting will be used).
-#
-# -----------------------------------------------------------------------------
-
-# API key authentication
-# If the key is not listed here, it will not be able to connect.
-
-[API_KEY]
-    # Default settings for the API key
-
-    # You can disable the API key, by setting this to: no
-    # The default (for unknown API keys) is also: no
-#    enabled = yes
-
-    # The default history in entries, for all hosts using this API key.
-    # You can also set it per host below.
-    # If you don't set it here, the history size of the central netdata
-    # will be used
-#    default history = 3600
-
-    # The default memory mode to be used for all hosts using this API key.
-    # You can also set it per host below.
-    # If you don't set it here, the memory mode of the central netdata
-    # will be used.
-    # Valid modes: save (load/save db), map (like swap), ram (no disk)
-#    default memory mode = save
-
-    # Shall we enable health monitoring for the hosts using this API key?
-    # 3 values:
-    #    yes     enable alarms
-    #    no      do not enable alarms
-    #    auto    enable alarms, only when the host is streaming metrics
-    # You can also set it per host, below.
-    # The default is the same as the central netdata
-#    health enabled by default = auto
-
-
-# -----------------------------------------------------------------------------
-# Each netdata has a unique GUID - generated the first time netdata starts.
-# You can find it at /var/lib/netdata/registry/netdata.public.unique.id
-# The host sending data will have one. If it is static and you can find it,
-# you can give settings for each specific host here.
-
-[MACHINE_GUID]
-    # This can be used to stop receiving data
-    # THIS IS NOT A SECURITY MECHANISM - AN ATTACKER CAN SET ANY OTHER GUID.
-    # Use only the API key for security.
-#    enabled = yes
-
-    # The number of entries in the database
-#    history = 3600
-
-    # The memory mode of the database
-#    memory mode = save
-
-    # Health / alarms control: yes | no | auto
-#    health enabled = yes
-
diff --git a/conf.d/stream.conf b/conf.d/stream.conf
new file mode 100644 (file)
index 0000000..2dcb25a
--- /dev/null
@@ -0,0 +1,118 @@
+# netdata configuration for aggregating data from remote hosts
+#
+# API keys authorize a pair of sending-receiving netdata servers.
+# Once their communication is authorized, they can exchange metrics for any
+# number of hosts.
+#
+# You can generate API keys, with the linux command: uuidgen
+#
+# -----------------------------------------------------------------------------
+# 1. SLAVE NETDATA - THE ONE THAT WILL BE SENDING METRICS
+
+[stream]
+       enabled = no
+
+       # where to send metrics to?
+       # A space separated list of IP:PORT is accepted. The first available will
+       # get the metrics.
+       # IPv6 addresses should be [IP]:PORT
+       destination =
+
+       # The API_KEY to use (as the sender)
+       api key =
+
+       # other options (uncomment and set)
+
+       # timeout seconds = 60
+       # default port = 19999
+       # buffer size bytes = 1048576
+       # reconnect delay seconds = 5
+       # initial clock resync iterations = 60
+
+
+# -----------------------------------------------------------------------------
+# 2. MASTER NETDATA - THE ONE THAT WILL BE RECEIVING METRICS
+#
+#    You can have one API key per slave, or the same API key for all slaves.
+#
+#    All options below are used in this order:
+#
+#    a) MACHINE_GUID (settings for each machine)
+#    b) API_KEY      (settings for the API key)
+#    c) this netdata defaults (as in netdata.conf)
+#
+#    You can combine the above (the more specific setting will be used).
+
+# API key authentication
+# If the key is not listed here, it will not be able to connect.
+
+[API_KEY]
+    # Default settings for the API key
+
+    # You can disable the API key, by setting this to: no
+    # The default (for unknown API keys) is also: no
+    enabled = no
+
+    # The default history in entries, for all hosts using this API key.
+    # You can also set it per host below.
+    # If you don't set it here, the history size of the central netdata
+    # will be used
+    default history = 3600
+
+    # The default memory mode to be used for all hosts using this API key.
+    # You can also set it per host below.
+    # If you don't set it here, the memory mode of netdata.conf will be used.
+    # Valid modes:
+    #    save    save on exit, load on start
+    #    map     like swap (continuously syncing to disks)
+    #    ram     keep it in RAM, don't touch the disk
+    #    none    no database (passing through this netdata)
+    default memory mode = ram
+
+    # Shall we enable health monitoring for the hosts using this API key?
+    # 3 values:
+    #    yes     enable alarms
+    #    no      do not enable alarms
+    #    auto    enable alarms, only when the sending netdata is connected
+    # You can also set it per host, below.
+    # The default is the same as to netdata.conf
+    health enabled by default = auto
+
+    # postpone alarms for a short period after the sender is connected
+    default postpone alarms on connect seconds = 60
+
+    # need to route metrics differently? set these.
+    # the defaults are the ones at the [stream] section
+    #default proxy enabled = yes | no
+    #default proxy destination = IP:PORT IP:PORT ...
+    #default proxy api key = API_KEY
+
+
+# -----------------------------------------------------------------------------
+# Each netdata has a unique GUID - generated the first time netdata starts.
+# You can find it at /var/lib/netdata/registry/netdata.public.unique.id
+# The host sending data will have one. If it is static and you can find it,
+# you can give settings for each specific host here.
+
+[MACHINE_GUID]
+    # enable this host: yes | no
+    # THIS IS NOT A SECURITY MECHANISM - AN ATTACKER CAN SET ANY OTHER GUID.
+    # Use only the API key for security.
+    enabled = no
+
+    # The number of entries in the database
+    history = 3600
+
+    # The memory mode of the database: save | map | ram | none
+    memory mode = save
+
+    # Health / alarms control: yes | no | auto
+    health enabled = yes
+
+    # postpone alarms when the sender connects
+    postpone alarms on connect seconds = 60
+
+    # need to route metrics differently?
+    #proxy enabled = yes | no
+    #proxy destination = IP:PORT IP:PORT ...
+    #proxy api key = API_KEY
index 7f590aca605f970a90543a07a005b6d8cd3aeb0e..ae8f599aae35a54fb99cd2897e301bdc96074b70 100644 (file)
@@ -705,6 +705,8 @@ int main(int argc, char **argv) {
     if(!config_loaded)
         config_load(NULL, 0);
 
+    // ------------------------------------------------------------------------
+    // initialize netdata
     {
         char *pmax = config_get(CONFIG_SECTION_GLOBAL, "glibc malloc arena max for plugins", "1");
         if(pmax && *pmax)
@@ -757,6 +759,16 @@ int main(int argc, char **argv) {
         log_init();
         error_log_limit_unlimited();
 
+
+        // --------------------------------------------------------------------
+        // load stream.conf
+        {
+            char filename[FILENAME_MAX + 1];
+            snprintfz(filename, FILENAME_MAX, "%s/stream.conf", netdata_configured_config_dir);
+            appconfig_load(&stream_config, filename, 0);
+        }
+
+
         // --------------------------------------------------------------------
         // setup process signals
 
@@ -851,15 +863,6 @@ int main(int argc, char **argv) {
 
         if(web_server_mode != WEB_SERVER_MODE_NONE)
             create_listen_sockets();
-
-
-        // --------------------------------------------------------------------
-        // load the aggregated host configuration file
-        {
-            char filename[FILENAME_MAX + 1];
-            snprintfz(filename, FILENAME_MAX, "%s/aggregated_hosts.conf", netdata_configured_config_dir);
-            appconfig_load(&stream_config, filename, 0);
-        }
     }
 
     // initialize the log files
index 7940cbb7a9216e79e118e49328fc5979fd76a75e..2c4592b922e20e5abe8423fa06abdde14489ab0f 100644 (file)
@@ -70,5 +70,6 @@ extern int registry_request_hello_json(RRDHOST *host, struct web_client *w);
 extern void registry_statistics(void);
 
 extern char *registry_get_this_machine_guid(void);
+extern int regenerate_guid(const char *guid, char *result);
 
 #endif /* NETDATA_REGISTRY_H */
index df330cc8a73ff84bb3284e79e85928a4ace400a3..9ec91ba401eb2a673421411545817f7651251d10 100644 (file)
@@ -7,7 +7,7 @@ struct registry registry;
 
 // parse a GUID and re-generated to be always lower case
 // this is used as a protection against the variations of GUIDs
-int registry_regenerate_guid(const char *guid, char *result) {
+int regenerate_guid(const char *guid, char *result) {
     uuid_t uuid;
     if(unlikely(uuid_parse(guid, uuid) == -1)) {
         info("Registry: GUID '%s' is not a valid GUID.", guid);
@@ -18,7 +18,7 @@ int registry_regenerate_guid(const char *guid, char *result) {
 
 #ifdef NETDATA_INTERNAL_CHECKS
         if(strcmp(guid, result))
-            info("Registry: source GUID '%s' and re-generated GUID '%s' differ!", guid, result);
+            info("GUID '%s' and re-generated GUID '%s' differ!", guid, result);
 #endif /* NETDATA_INTERNAL_CHECKS */
     }
 
@@ -96,14 +96,14 @@ REGISTRY_PERSON_URL *registry_verify_request(char *person_guid, char *machine_gu
     url = registry_fix_url(url, NULL);
 
     // make sure the person GUID is valid
-    if(registry_regenerate_guid(person_guid, pbuf) == -1) {
+    if(regenerate_guid(person_guid, pbuf) == -1) {
         info("Registry Request Verification: invalid person GUID, person: '%s', machine '%s', url '%s'", person_guid, machine_guid, url);
         return NULL;
     }
     person_guid = pbuf;
 
     // make sure the machine GUID is valid
-    if(registry_regenerate_guid(machine_guid, mbuf) == -1) {
+    if(regenerate_guid(machine_guid, mbuf) == -1) {
         info("Registry Request Verification: invalid machine GUID, person: '%s', machine '%s', url '%s'", person_guid, machine_guid, url);
         return NULL;
     }
@@ -226,7 +226,7 @@ REGISTRY_MACHINE *registry_request_machine(char *person_guid, char *machine_guid
     if(!pu || !p || !m) return NULL;
 
     // make sure the machine GUID is valid
-    if(registry_regenerate_guid(request_machine, mbuf) == -1) {
+    if(regenerate_guid(request_machine, mbuf) == -1) {
         info("Registry Machine URLs request: invalid machine GUID, person: '%s', machine '%s', url '%s', request machine '%s'", p->guid, m->guid, pu->url->url, request_machine);
         return NULL;
     }
@@ -288,7 +288,7 @@ char *registry_get_this_machine_guid(void) {
             error("Failed to read machine GUID from '%s'", registry.machine_guid_filename);
         else {
             buf[GUID_LEN] = '\0';
-            if(registry_regenerate_guid(buf, guid) == -1) {
+            if(regenerate_guid(buf, guid) == -1) {
                 error("Failed to validate machine GUID '%s' from '%s'. Ignoring it - this might mean this netdata will appear as duplicate in the registry.",
                         buf, registry.machine_guid_filename);
 
index 8aef16b22ab94d66bac89dd2cd0aed4d5deae3bb..27c2fe0745de4e1b5d09680eae75a4eea058b919 100644 (file)
@@ -59,7 +59,7 @@ struct registry {
     pthread_mutex_t lock;
 };
 
-extern int registry_regenerate_guid(const char *guid, char *result);
+extern int regenerate_guid(const char *guid, char *result);
 
 #include "registry_url.h"
 #include "registry_machine.h"
index 3510736df4de73ed35ea247c024e194d4e677c14..6dc8200d39522f83c94e7d58fff2ea960b908949 100644 (file)
@@ -58,7 +58,7 @@ REGISTRY_MACHINE *registry_machine_get(const char *machine_guid, time_t when) {
     if(likely(machine_guid && *machine_guid)) {
         // validate it is a GUID
         char buf[GUID_LEN + 1];
-        if(unlikely(registry_regenerate_guid(machine_guid, buf) == -1))
+        if(unlikely(regenerate_guid(machine_guid, buf) == -1))
             info("Registry: machine guid '%s' is not a valid guid. Ignoring it.", machine_guid);
         else {
             machine_guid = buf;
index 5f9099c9aa6995eb2bc761910c31865a680393d9..409c76925ba54719cff0fc85c4e75c0753331cbb 100644 (file)
@@ -183,7 +183,7 @@ REGISTRY_PERSON *registry_person_get(const char *person_guid, time_t when) {
     if(person_guid && *person_guid) {
         char buf[GUID_LEN + 1];
         // validate it is a GUID
-        if(unlikely(registry_regenerate_guid(person_guid, buf) == -1))
+        if(unlikely(regenerate_guid(person_guid, buf) == -1))
             info("Registry: person guid '%s' is not a valid guid. Ignoring it.", person_guid);
         else {
             person_guid = buf;
index 2cb06607f51c14ba928503599a294a0c04028899..14ef0f721275eb666388b7b18784d664e0c85ec6 100644 (file)
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -342,6 +342,8 @@ struct rrdhost {
     int rrd_history_entries;                        // the number of history entries for the host's charts
 
     int rrdpush_enabled;                            // 1 when this host sends metrics to another netdata
+    char *rrdpush_destination;                      // where to send metrics to
+    char *rrdpush_api_key;                          // the api key at the receiving netdata
     volatile int rrdpush_connected;                 // 1 when the sender is ready to push metrics
     volatile int rrdpush_spawn;                     // 1 when the sender thread has been spawn
     volatile int rrdpush_error_shown;               // 1 when we have logged a communication error
@@ -423,7 +425,18 @@ extern pthread_rwlock_t rrd_rwlock;
 extern void rrd_init(char *hostname);
 
 extern RRDHOST *rrdhost_find(const char *guid, uint32_t hash);
-extern RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, const char *os, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled);
+extern RRDHOST *rrdhost_find_or_create(
+        const char *hostname
+        , const char *guid
+        , const char *os
+        , int update_every
+        , int history
+        , RRD_MEMORY_MODE mode
+        , int health_enabled
+        , int rrdpush_enabled
+        , char *rrdpush_destination
+        , char *rrdpush_api_key
+);
 
 #ifdef NETDATA_INTERNAL_CHECKS
 extern void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line);
index 6e9d7baade02d0c3e9244e58bf49f9eb08bb0806..658fba001290c3ab565740b4d024e06aa0ad3347 100644 (file)
@@ -65,6 +65,9 @@ RRDHOST *rrdhost_create(const char *hostname,
         int entries,
         RRD_MEMORY_MODE memory_mode,
         int health_enabled,
+        int rrdpush_enabled,
+        char *rrdpush_destination,
+        char *rrdpush_api_key,
         int is_localhost
 ) {
 
@@ -76,11 +79,13 @@ RRDHOST *rrdhost_create(const char *hostname,
     host->rrd_history_entries = entries;
     host->rrd_memory_mode     = memory_mode;
     host->health_enabled      = (memory_mode == RRD_MEMORY_MODE_NONE)? 0 : health_enabled;
-    host->rrdpush_enabled     = default_rrdpush_enabled;
+    host->rrdpush_enabled     = (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key);
+    host->rrdpush_destination = (host->rrdpush_enabled)?strdupz(rrdpush_destination):NULL;
+    host->rrdpush_api_key     = (host->rrdpush_enabled)?strdupz(rrdpush_api_key):NULL;
 
     host->rrdpush_pipe[0] = -1;
     host->rrdpush_pipe[1] = -1;
-    host->rrdpush_socket = -1;
+    host->rrdpush_socket  = -1;
 
     pthread_mutex_init(&host->rrdpush_mutex, NULL);
     pthread_rwlock_init(&host->rrdhost_rwlock, NULL);
@@ -201,6 +206,7 @@ RRDHOST *rrdhost_create(const char *hostname,
                      ", memory mode: %s"
                      ", history entries: %d"
                      ", streaming: %s"
+                     " to: '%s' (api key: '%s')"
                      ", health: %s"
                      ", cache_dir: '%s'"
                      ", varlib_dir: '%s'"
@@ -214,6 +220,8 @@ RRDHOST *rrdhost_create(const char *hostname,
              , rrd_memory_mode_name(host->rrd_memory_mode)
              , host->rrd_history_entries
              , host->rrdpush_enabled?"enabled":"disabled"
+             , host->rrdpush_destination
+             , host->rrdpush_api_key
              , host->health_enabled?"enabled":"disabled"
              , host->cache_dir
              , host->varlib_dir
@@ -228,12 +236,35 @@ RRDHOST *rrdhost_create(const char *hostname,
     return host;
 }
 
-RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, const char *os, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled) {
+RRDHOST *rrdhost_find_or_create(
+          const char *hostname
+        , const char *guid
+        , const char *os
+        , int update_every
+        , int history
+        , RRD_MEMORY_MODE mode
+        , int health_enabled
+        , int rrdpush_enabled
+        , char *rrdpush_destination
+        , char *rrdpush_api_key
+) {
     debug(D_RRDHOST, "Searching for host '%s' with guid '%s'", hostname, guid);
 
     RRDHOST *host = rrdhost_find(guid, 0);
     if(!host) {
-        host = rrdhost_create(hostname, guid, os, update_every, history, mode, health_enabled, 0);
+        host = rrdhost_create(
+                hostname
+                , guid
+                , os
+                , update_every
+                , history
+                , mode
+                , health_enabled
+                , rrdpush_enabled
+                , rrdpush_destination
+                , rrdpush_api_key
+                , 0
+        );
     }
     else {
         host->health_enabled = health_enabled;
@@ -267,14 +298,18 @@ void rrd_init(char *hostname) {
     rrdpush_init();
 
     debug(D_RRDHOST, "Initializing localhost with hostname '%s'", hostname);
-    localhost = rrdhost_create(hostname,
-            registry_get_this_machine_guid(),
-            os_type,
-            default_rrd_update_every,
-            default_rrd_history_entries,
-            default_rrd_memory_mode,
-            default_health_enabled,
-            1
+    localhost = rrdhost_create(
+            hostname
+            , registry_get_this_machine_guid()
+            , os_type
+            , default_rrd_update_every
+            , default_rrd_history_entries
+            , default_rrd_memory_mode
+            , default_health_enabled
+            , default_rrdpush_enabled
+            , default_rrdpush_destination
+            , default_rrdpush_api_key
+            , 1
     );
 }
 
@@ -369,6 +404,8 @@ void rrdhost_free(RRDHOST *host) {
     freez(host->os);
     freez(host->cache_dir);
     freez(host->varlib_dir);
+    freez(host->rrdpush_api_key);
+    freez(host->rrdpush_destination);
     freez(host->health_default_exec);
     freez(host->health_default_recipient);
     freez(host->health_log_filename);
index 218bcc39e35c73da3c0a38da4447893cd7261207..67bb552aa60ee78b8eccebfdccd69c6f1c09ebb0 100644 (file)
  *
  */
 
+#define START_STREAMING_PROMPT "Hit me baby, push them over..."
+
 int default_rrdpush_enabled = 0;
-static char *rrdpush_destination = NULL;
-static char *rrdpush_api_key = NULL;
+char *default_rrdpush_destination = NULL;
+char *default_rrdpush_api_key = NULL;
 
 int rrdpush_init() {
-    default_rrdpush_enabled   = config_get_boolean(CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
-    rrdpush_destination       = config_get(CONFIG_SECTION_STREAM, "destination", "");
-    rrdpush_api_key           = config_get(CONFIG_SECTION_STREAM, "api key", "");
+    default_rrdpush_enabled     = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
+    default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
+    default_rrdpush_api_key     = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
 
-    if(default_rrdpush_enabled && (!rrdpush_destination || !*rrdpush_destination || !rrdpush_api_key || !*rrdpush_api_key)) {
+    if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
         error("STREAM [send]: cannot enable sending thread - information is missing.");
         default_rrdpush_enabled = 0;
+        default_rrdpush_api_key = NULL;
+        default_rrdpush_destination = NULL;
     }
 
     return default_rrdpush_enabled;
@@ -237,14 +241,14 @@ void *rrdpush_sender_thread(void *ptr) {
     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
         error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname);
 
-    int timeout = (int)config_get_number(CONFIG_SECTION_STREAM, "timeout seconds", 60);
-    int default_port = (int)config_get_number(CONFIG_SECTION_STREAM, "default port", 19999);
-    size_t max_size = (size_t)config_get_number(CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024);
-    unsigned int reconnect_delay = (unsigned int)config_get_number(CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
-    remote_clock_resync_iterations = (unsigned int)config_get_number(CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations);
+    int timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60);
+    int default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
+    size_t max_size = (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024);
+    unsigned int reconnect_delay = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
+    remote_clock_resync_iterations = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations);
     char connected_to[CONNECTED_TO_SIZE + 1] = "";
 
-    if(!host->rrdpush_enabled || !rrdpush_destination || !*rrdpush_destination || !rrdpush_api_key || !*rrdpush_api_key)
+    if(!host->rrdpush_enabled || !host->rrdpush_destination || !*host->rrdpush_destination || !host->rrdpush_api_key || !*host->rrdpush_api_key)
         goto cleanup;
 
     // initialize rrdpush globals
@@ -276,11 +280,11 @@ void *rrdpush_sender_thread(void *ptr) {
             // they will be lost, so there is no point to do it
             host->rrdpush_connected = 0;
 
-            info("STREAM %s [send to %s]: connecting...", host->hostname, rrdpush_destination);
-            host->rrdpush_socket = connect_to_one_of(rrdpush_destination, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE);
+            info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_destination);
+            host->rrdpush_socket = connect_to_one_of(host->rrdpush_destination, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE);
 
             if(unlikely(host->rrdpush_socket == -1)) {
-                error("STREAM %s [send to %s]: failed to connect", host->hostname, rrdpush_destination);
+                error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_destination);
                 sleep(reconnect_delay);
                 continue;
             }
@@ -292,7 +296,7 @@ void *rrdpush_sender_thread(void *ptr) {
                     "STREAM key=%s&hostname=%s&machine_guid=%s&os=%s&update_every=%d HTTP/1.1\r\n"
                     "User-Agent: netdata-push-service/%s\r\n"
                     "Accept: */*\r\n\r\n"
-                      , rrdpush_api_key
+                      , host->rrdpush_api_key
                       , host->hostname
                       , host->machine_guid
                       , host->os
@@ -318,7 +322,7 @@ void *rrdpush_sender_thread(void *ptr) {
                 continue;
             }
 
-            if(strncmp(http, "STREAM", 6)) {
+            if(strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT))) {
                 close(host->rrdpush_socket);
                 host->rrdpush_socket = -1;
                 error("STREAM %s [send to %s]: server is not replying properly.", host->hostname, connected_to);
@@ -428,6 +432,9 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
     int history = default_rrd_history_entries;
     RRD_MEMORY_MODE mode = default_rrd_memory_mode;
     int health_enabled = default_health_enabled;
+    int rrdpush_enabled = default_rrdpush_enabled;
+    char *rrdpush_destination = default_rrdpush_destination;
+    char *rrdpush_api_key = default_rrdpush_api_key;
     time_t alarms_delay = 60;
 
     update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
@@ -446,10 +453,30 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
     alarms_delay = appconfig_get_number(&stream_config, key, "default postpone alarms on connect seconds", alarms_delay);
     alarms_delay = appconfig_get_number(&stream_config, machine_guid, "postpone alarms on connect seconds", alarms_delay);
 
+    rrdpush_enabled = appconfig_get_boolean(&stream_config, key, "default proxy enabled", rrdpush_enabled);
+    rrdpush_enabled = appconfig_get_boolean(&stream_config, machine_guid, "proxy enabled", rrdpush_enabled);
+
+    rrdpush_destination = appconfig_get(&stream_config, key, "default proxy destination", rrdpush_destination);
+    rrdpush_destination = appconfig_get(&stream_config, machine_guid, "proxy destination", rrdpush_destination);
+
+    rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key);
+    rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key);
+
     if(!strcmp(machine_guid, "localhost"))
         host = localhost;
     else
-        host = rrdhost_find_or_create(hostname, machine_guid, os, update_every, history, mode, (health_enabled == CONFIG_BOOLEAN_NO)?0:1);
+        host = rrdhost_find_or_create(
+                hostname
+                , machine_guid
+                , os
+                , update_every
+                , history
+                , mode
+                , (health_enabled != CONFIG_BOOLEAN_NO)
+                , (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
+                , rrdpush_destination
+                , rrdpush_api_key
+        );
 
     if(!host) {
         close(fd);
@@ -457,7 +484,8 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
         return 1;
     }
 
-    info("STREAM %s [receive from [%s]:%s]: metrics for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s"
+#ifdef NETDATA_INTERNAL_CHECKS
+    info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s"
          , hostname
          , client_ip
          , client_port
@@ -468,6 +496,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
          , rrd_memory_mode_name(host->rrd_memory_mode)
          , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
     );
+#endif // NETDATA_INTERNAL_CHECKS
 
     struct plugind cd = {
             .enabled = 1,
@@ -487,8 +516,8 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
     snprintfz(cd.cmd,          PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port);
 
     info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port);
-    if(send_timeout(fd, "STREAM", 6, 0, 60) != 6) {
-        error("STREAM %s [receive from [%s]:%s]: cannot send STREAM command.", host->hostname, client_ip, client_port);
+    if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
+        error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port);
         return 0;
     }
 
@@ -510,9 +539,9 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
     rrdhost_unlock(host);
 
     // call the plugins.d processor to receive the metrics
-    info("STREAM %s [receive from [%s]:%s]: receiving metrics... (host '%s', machine GUID '%s').", host->hostname, client_ip, client_port, host->hostname, host->machine_guid);
+    info("STREAM %s [receive from [%s]:%s]: receiving metrics...", host->hostname, client_ip, client_port);
     size_t count = pluginsd_process(host, &cd, fp, 1);
-    error("STREAM %s [receive from [%s]:%s]: disconnected (host '%s', machine GUID '%s', completed updates %zu).", host->hostname, client_ip, client_port, host->hostname, host->machine_guid, count);
+    error("STREAM %s [receive from [%s]:%s]: disconnected (completed updates %zu).", host->hostname, client_ip, client_port, count);
 
     rrdhost_wrlock(host);
     host->use_counter--;
@@ -564,10 +593,6 @@ void *rrdpush_receiver_thread(void *ptr) {
     return NULL;
 }
 
-static inline int rrdpush_receive_validate_api_key(const char *key) {
-    return appconfig_get_boolean(&stream_config, key, "enabled", 0);
-}
-
 void rrdpush_sender_thread_spawn(RRDHOST *host) {
     if(pthread_create(&host->rrdpush_thread, NULL, rrdpush_sender_thread, (void *)host))
         error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
@@ -585,6 +610,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
 
     char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = NULL;
     int update_every = default_rrd_update_every;
+    char buf[GUID_LEN + 1];
 
     while(url) {
         char *value = mystrsep(&url, "?&");
@@ -627,8 +653,22 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
         return 400;
     }
 
-    if(!rrdpush_receive_validate_api_key(key)) {
-        error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
+    if(regenerate_guid(key, buf) == -1) {
+        error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID. Forbidding access.", w->client_ip, w->client_port, key);
+        buffer_flush(w->response.data);
+        buffer_sprintf(w->response.data, "Your API key is invalid.");
+        return 401;
+    }
+
+    if(regenerate_guid(machine_guid, buf) == -1) {
+        error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, key);
+        buffer_flush(w->response.data);
+        buffer_sprintf(w->response.data, "Your machine GUID is invalid.");
+        return 404;
+    }
+
+    if(!appconfig_get_boolean(&stream_config, key, "enabled", 1)) {
+        error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
         buffer_flush(w->response.data);
         buffer_sprintf(w->response.data, "Your API key is not permitted access.");
         return 401;
index 6a7d8e59a93e750eb802c78e2b2988968af3f42f..ef3545f8f4e2967243039457491ed4697113b25f 100644 (file)
@@ -2,6 +2,8 @@
 #define NETDATA_RRDPUSH_H
 
 extern int default_rrdpush_enabled;
+extern char *default_rrdpush_destination;
+extern char *default_rrdpush_api_key;
 
 extern int rrdpush_init();
 extern void rrdset_done_push(RRDSET *st);