]> arthur.barton.de Git - netdata.git/blobdiff - python.d/python_modules/base.py
non-blocking `SocketService`
[netdata.git] / python.d / python_modules / base.py
index 29b65d911613540a2482f7df4f6ee35ed110f798..a074228b2fde3be78dc681cbca758ce5be312d4d 100644 (file)
@@ -6,7 +6,7 @@ import time
 import sys
 import os
 import socket
-import resource
+import select
 try:
     import urllib.request as urllib2
 except ImportError:
@@ -96,7 +96,7 @@ class BaseService(threading.Thread):
         if self.timetable['next'] > t_start:
             #msg.debug(self.chart_name + " will be run in " +
             #          str(int((self.timetable['next'] - t_start) * 1000)) + " ms")
-            msg.debug(self.chart_name,"will be run in", str(int((self.timetable['next'] - t_start) * 1000)), "ms")
+            msg.debug(self.chart_name, "will be run in", str(int((self.timetable['next'] - t_start) * 1000)), "ms")
             return True
 
         since_last = int((t_start - self.timetable['last']) * 1000000)
@@ -109,6 +109,7 @@ class BaseService(threading.Thread):
                   "ms (update_every:", str(self.timetable['freq'] * 1000),
                   "ms, latency:", str(int((t_start - self.timetable['next']) * 1000)), "ms")
         if not self.update(since_last):
+            self.error("update function failed.")
             return False
         t_end = time.time()
         self.timetable['next'] = t_end - (t_end % self.timetable['freq']) + self.timetable['freq']
@@ -137,7 +138,7 @@ class BaseService(threading.Thread):
             try:
                 status = self._run_once()
             except Exception as e:
-                msg.error("Something wrong: " + str(e))
+                msg.error("Something wrong: ", str(e))
                 return
             if status:
                 time.sleep(self.timetable['next'] - time.time())
@@ -150,7 +151,8 @@ class BaseService(threading.Thread):
                 else:
                     time.sleep(self.timetable['freq'])
 
-    def _format(self, *args):
+    @staticmethod
+    def _format(*args):
         params = []
         append = params.append
         for p in args:
@@ -383,6 +385,7 @@ class SimpleService(BaseService):
         """
         data = self._get_data()
         if data is None:
+            self.debug("_get_data() returned no data")
             return False
 
         updated = False
@@ -397,6 +400,8 @@ class SimpleService(BaseService):
                 self.end()
 
         self.commit()
+        if not updated:
+            self.error("no charts to update")
 
         return updated
 
@@ -424,13 +429,13 @@ class UrlService(SimpleService):
         try:
             f = urllib2.urlopen(self.url, timeout=self.update_every)
         except Exception as e:
-            msg.error(self.__module__, str(e))
+            self.error(str(e))
             return None
 
         try:
             raw = f.read().decode('utf-8')
         except Exception as e:
-            msg.error(self.__module__, str(e))
+            self.error(str(e))
         finally:
             f.close()
         return raw
@@ -469,97 +474,174 @@ class UrlService(SimpleService):
 
 class SocketService(SimpleService):
     def __init__(self, configuration=None, name=None):
+        self._sock = None
+        self._keep_alive = True
         self.host = "localhost"
         self.port = None
-        self.sock = None
         self.unix_socket = None
         self.request = ""
+        self.__socket_config = None
         SimpleService.__init__(self, configuration=configuration, name=name)
 
-    def _get_raw_data(self):
+    def _connect(self):
         """
-        Get raw data with low-level "socket" module.
-        :return: str
+        Recreate socket and connect to it since they cannot be reused
+        Available configurations are IPv6, IPv4 or UNIX socket
+        :return:
         """
-        if self.sock is None:
-            try:
-                if self.unix_socket is None:
-                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-                    #sock.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1)
-                    #sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-                    #sock.settimeout(self.update_every)
-                    sock.settimeout(0.5)
-                    sock.connect((self.host, self.port))
-                    sock.settimeout(0.5)  # Just to be sure
+        try:
+            if self.unix_socket is None:
+                if self.__socket_config is None:
+                    # establish ipv6 or ipv4 connection.
+                    for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
+                        try:
+                            af, socktype, proto, canonname, sa = res
+                            self._sock = socket.socket(af, socktype, proto)
+                        except socket.error as msg:
+                            self._sock = None
+                            continue
+                        try:
+                            self._sock.connect(sa)
+                        except socket.error as msg:
+                            self._disconnect()
+                            continue
+                        self.__socket_config = res
+                        break
                 else:
-                    sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
-                    #sock.settimeout(self.update_every)
-                    sock.settimeout(0.05)
-                    sock.connect(self.unix_socket)
-                    sock.settimeout(0.05)  # Just to be sure
+                    # connect to socket with previously established configuration
+                    try:
+                        af, socktype, proto, canonname, sa = self.__socket_config
+                        self._sock = socket.socket(af, socktype, proto)
+                        self._sock.connect(sa)
+                    except socket.error as msg:
+                        self._disconnect()
+            else:
+                # connect to unix socket
+                self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+                self._sock.connect(self.unix_socket)
+        except Exception as e:
+            self.error(str(e),
+                       "Cannot create socket with following configuration: host:", str(self.host),
+                       "port:", str(self.port),
+                       "socket:", str(self.unix_socket))
+            self._sock = None
+        self._sock.setblocking(0)
 
-            except Exception as e:
-                self.error(str(e))
-                self.sock = None
-                return None
+    def _disconnect(self):
+        """
+        Close socket connection
+        :return:
+        """
+        try:
+            self._sock.shutdown(2)  # 0 - read, 1 - write, 2 - all
+            self._sock.close()
+        except Exception:
+            pass
+        self._sock = None
 
+    def _send(self):
+        """
+        Send request.
+        :return: boolean
+        """
+        # Send request if it is needed
         if self.request != "".encode():
             try:
-                sock.send(self.request)
-            except Exception:
-                try:
-                    sock.shutdown(1)
-                    sock.close()
-                except:
-                    pass
-                self.sock = None
-                return None
+                self._sock.send(self.request)
+            except Exception as e:
+                self._disconnect()
+                self.error(str(e),
+                           "used configuration: host:", str(self.host),
+                           "port:", str(self.port),
+                           "socket:", str(self.unix_socket))
+                return False
+        return True
 
-        size = 2
-        try:
-            data = sock.recv(size).decode()
-        except Exception as e:
-            self.error(str(e))
-            sock.close()
+    def _receive(self):
+        """
+        Receive data from socket
+        :return: str
+        """
+        data = ""
+        while True:
+            try:
+                ready_to_read, _, in_error = select.select([self._sock], [], [], 0.01)
+            except Exception as e:
+                self.debug("SELECT", str(e))
+                self._disconnect()
+                break
+            if len(ready_to_read) > 0:
+                buf = self._sock.recv(4096)
+                if len(buf) == 0 or buf is None:
+                    break
+                data += buf.decode()
+            else:
+                self._disconnect()
+                break
+
+        return data
+
+    def _get_raw_data(self):
+        """
+        Get raw data with low-level "socket" module.
+        :return: str
+        """
+        if self._sock is None:
+            self._connect()
+
+        # Send request if it is needed
+        if not self._send():
             return None
 
-        while True:
-            # implement something like TCP Window Scaling
-            if size < 4096:
-                size *= 2
-            buf = sock.recv(size)
-            data += buf.decode()
-            if len(buf) < size:
+        finished = False
+        data = ""
+        prevent_infinite_loop = 1000000
+        while not finished:
+            data += self._receive()
+            finished = self._check_raw_data(data)
+            prevent_infinite_loop -= 1
+            if prevent_infinite_loop <= 0:
+                self.debug("Almost got into infinite loop while grabbing data. Is _check_raw_data() ok?")
                 break
 
+        if not self._keep_alive:
+            self._disconnect()
+
         return data
 
+    def _check_raw_data(self, data):
+        """
+        Check if all data has been gathered from socket
+        :param data: str
+        :return: boolean
+        """
+        return True
+
     def _parse_config(self):
         """
         Parse configuration data
         :return: boolean
         """
-        if self.name is not None or self.name != str(None):
+        if self.name is None or self.name == str(None):
             self.name = ""
         else:
             self.name = str(self.name)
         try:
             self.unix_socket = str(self.configuration['socket'])
         except (KeyError, TypeError):
-            self.error("No unix socket specified. Trying TCP/IP socket.")
+            self.debug("No unix socket specified. Trying TCP/IP socket.")
             try:
                 self.host = str(self.configuration['host'])
             except (KeyError, TypeError):
-                self.error("No host specified. Using: '" + self.host + "'")
+                self.debug("No host specified. Using: '" + self.host + "'")
             try:
                 self.port = int(self.configuration['port'])
             except (KeyError, TypeError):
-                self.error("No port specified. Using: '" + str(self.port) + "'")
+                self.debug("No port specified. Using: '" + str(self.port) + "'")
         try:
             self.request = str(self.configuration['request'])
         except (KeyError, TypeError):
-            self.error("No request specified. Using: '" + str(self.request) + "'")
+            self.debug("No request specified. Using: '" + str(self.request) + "'")
         self.request = self.request.encode()
 
 
@@ -581,6 +663,7 @@ class LogService(SimpleService):
             if os.path.getsize(self.log_path) < self._last_position:
                 self._last_position = 0
             elif os.path.getsize(self.log_path) == self._last_position:
+                self.debug("Log file hasn't changed. No new data.")
                 return None
             with open(self.log_path, "r") as fp:
                 fp.seek(self._last_position)
@@ -588,11 +671,13 @@ class LogService(SimpleService):
                     lines.append(line)
                 self._last_position = fp.tell()
         except Exception as e:
-            self.error(self.__module__, str(e))
+            self.error(str(e))
 
         if len(lines) != 0:
             return lines
-        return None
+        else:
+            self.error("No data collected.")
+            return None
 
     def check(self):
         """
@@ -621,7 +706,7 @@ class LogService(SimpleService):
 
 
 class ExecutableService(SimpleService):
-    command_whitelist = ['exim', 'postqueue']
+    #command_whitelist = ['exim', 'postqueue']
     bad_substrings = ('&', '|', ';', '>', '<')
 
     def __init__(self, configuration=None, name=None):
@@ -636,12 +721,16 @@ class ExecutableService(SimpleService):
         try:
             p = Popen(self.command, stdout=PIPE, stderr=PIPE)
         except Exception as e:
-            self.error(self.__module__, str(e))
+            self.error("Executing command", self.command, "resulted in error:", str(e))
             return None
         data = []
         for line in p.stdout.readlines():
             data.append(str(line.decode()))
 
+        if len(data) == 0:
+            self.error("No data collected.")
+            return None
+
         return data
 
     def check(self):
@@ -653,21 +742,21 @@ class ExecutableService(SimpleService):
             self.name = ""
         else:
             self.name = str(self.name)
-        try:
-        #     self.command = str(self.configuration['path'])
-        except (KeyError, TypeError):
-            self.error("No command specified. Using: '" + self.command + "'")
+        try:
+            self.command = str(self.configuration['command'])
+        except (KeyError, TypeError):
+            self.error("No command specified. Using: '" + self.command + "'")
         self.command = self.command.split(' ')
-        if self.command[0] not in self.command_whitelist:
-            self.error("Command is not whitelisted.")
-            return False
+        #if self.command[0] not in self.command_whitelist:
+        #    self.error("Command is not whitelisted.")
+        #    return False
 
         for arg in self.command[1:]:
             if any(st in arg for st in self.bad_substrings):
                 self.error("Bad command argument:" + " ".join(self.command[1:]))
                 return False
         # test command and search for it in /usr/sbin or /sbin when failed
-        base = self.command[0]
+        base = self.command[0].split('/')[-1]
         if self._get_raw_data() is None:
             for prefix in ['/sbin/', '/usr/sbin/']:
                 self.command[0] = prefix + base
@@ -677,5 +766,6 @@ class ExecutableService(SimpleService):
                 #    break
 
         if self._get_data() is None or len(self._get_data()) == 0:
+            self.error("Command", self.command, "returned no data")
             return False
         return True