]> arthur.barton.de Git - netdata.git/blobdiff - src/rrdpush.c
avoid code duplication
[netdata.git] / src / rrdpush.c
index 0e7f0aedb170fabade8dbe8db6a33bf7f3070c82..a283495555528aa6bbf0138d8f8f59843c67bf49 100644 (file)
@@ -206,28 +206,45 @@ static inline void rrdpush_sender_thread_unlock(RRDHOST *host) {
     rrdpush_unlock(host);
 }
 
-void rrdpush_sender_thread_cleanup(RRDHOST *host) {
+static void rrdpush_sender_thread_cleanup(RRDHOST *host) {
     rrdpush_lock(host);
 
     host->rrdpush_connected = 0;
 
-    if(host->rrdpush_socket != -1) close(host->rrdpush_socket);
+    if(host->rrdpush_socket != -1) {
+        close(host->rrdpush_socket);
+        host->rrdpush_socket = -1;
+    }
 
     // close the pipe
-    if(host->rrdpush_pipe[PIPE_READ] != -1)  close(host->rrdpush_pipe[PIPE_READ]);
-    if(host->rrdpush_pipe[PIPE_WRITE] != -1) close(host->rrdpush_pipe[PIPE_WRITE]);
-    host->rrdpush_pipe[PIPE_READ] = -1;
-    host->rrdpush_pipe[PIPE_WRITE] = -1;
+    if(host->rrdpush_pipe[PIPE_READ] != -1) {
+        close(host->rrdpush_pipe[PIPE_READ]);
+        host->rrdpush_pipe[PIPE_READ] = -1;
+    }
+
+    if(host->rrdpush_pipe[PIPE_WRITE] != -1) {
+        close(host->rrdpush_pipe[PIPE_WRITE]);
+        host->rrdpush_pipe[PIPE_WRITE] = -1;
+    }
 
     buffer_free(host->rrdpush_buffer);
     host->rrdpush_buffer = NULL;
 
     host->rrdpush_spawn = 0;
-    host->rrdpush_enabled = 0;
 
     rrdpush_unlock(host);
 }
 
+void rrdpush_sender_thread_stop(RRDHOST *host) {
+    rrdhost_check_wrlock(host);
+
+    if(host->rrdpush_spawn) {
+        info("STREAM %s [send]: stopping sending thread...", host->hostname);
+        pthread_cancel(host->rrdpush_thread);
+        rrdpush_sender_thread_cleanup(host);
+    }
+}
+
 void *rrdpush_sender_thread(void *ptr) {
     RRDHOST *host = (RRDHOST *)ptr;
 
@@ -547,10 +564,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
         if(health_enabled == CONFIG_BOOLEAN_AUTO)
             host->health_enabled = 0;
 
-        if(host->rrdpush_enabled && host->rrdpush_spawn) {
-            pthread_cancel(host->rrdpush_thread);
-            rrdpush_sender_thread_cleanup(host);
-        }
+        rrdpush_sender_thread_stop(host);
     }
     rrdhost_unlock(host);