rrdpush_unlock(host);
}
-void rrdpush_sender_thread_cleanup(RRDHOST *host) {
+static void rrdpush_sender_thread_cleanup(RRDHOST *host) {
rrdpush_lock(host);
host->rrdpush_connected = 0;
- if(host->rrdpush_socket != -1) close(host->rrdpush_socket);
+ if(host->rrdpush_socket != -1) {
+ close(host->rrdpush_socket);
+ host->rrdpush_socket = -1;
+ }
// close the pipe
- if(host->rrdpush_pipe[PIPE_READ] != -1) close(host->rrdpush_pipe[PIPE_READ]);
- if(host->rrdpush_pipe[PIPE_WRITE] != -1) close(host->rrdpush_pipe[PIPE_WRITE]);
- host->rrdpush_pipe[PIPE_READ] = -1;
- host->rrdpush_pipe[PIPE_WRITE] = -1;
+ if(host->rrdpush_pipe[PIPE_READ] != -1) {
+ close(host->rrdpush_pipe[PIPE_READ]);
+ host->rrdpush_pipe[PIPE_READ] = -1;
+ }
+
+ if(host->rrdpush_pipe[PIPE_WRITE] != -1) {
+ close(host->rrdpush_pipe[PIPE_WRITE]);
+ host->rrdpush_pipe[PIPE_WRITE] = -1;
+ }
buffer_free(host->rrdpush_buffer);
host->rrdpush_buffer = NULL;
host->rrdpush_spawn = 0;
- host->rrdpush_enabled = 0;
rrdpush_unlock(host);
}
+void rrdpush_sender_thread_stop(RRDHOST *host) {
+ rrdhost_check_wrlock(host);
+
+ if(host->rrdpush_spawn) {
+ info("STREAM %s [send]: stopping sending thread...", host->hostname);
+ pthread_cancel(host->rrdpush_thread);
+ rrdpush_sender_thread_cleanup(host);
+ }
+}
+
void *rrdpush_sender_thread(void *ptr) {
RRDHOST *host = (RRDHOST *)ptr;
if(health_enabled == CONFIG_BOOLEAN_AUTO)
host->health_enabled = 0;
- if(host->rrdpush_enabled && host->rrdpush_spawn) {
- pthread_cancel(host->rrdpush_thread);
- rrdpush_sender_thread_cleanup(host);
- }
+ rrdpush_sender_thread_stop(host);
}
rrdhost_unlock(host);