]> arthur.barton.de Git - netdata.git/blobdiff - python.d/python_modules/base.py
non-blocking `SocketService`
[netdata.git] / python.d / python_modules / base.py
index 786cd7f61603e34e3007e54b5447425d9660920b..a074228b2fde3be78dc681cbca758ce5be312d4d 100644 (file)
@@ -6,6 +6,7 @@ import time
 import sys
 import os
 import socket
+import select
 try:
     import urllib.request as urllib2
 except ImportError:
@@ -384,6 +385,7 @@ class SimpleService(BaseService):
         """
         data = self._get_data()
         if data is None:
+            self.debug("_get_data() returned no data")
             return False
 
         updated = False
@@ -398,6 +400,8 @@ class SimpleService(BaseService):
                 self.end()
 
         self.commit()
+        if not updated:
+            self.error("no charts to update")
 
         return updated
 
@@ -425,13 +429,13 @@ class UrlService(SimpleService):
         try:
             f = urllib2.urlopen(self.url, timeout=self.update_every)
         except Exception as e:
-            msg.error(str(e))
+            self.error(str(e))
             return None
 
         try:
             raw = f.read().decode('utf-8')
         except Exception as e:
-            msg.error(str(e))
+            self.error(str(e))
         finally:
             f.close()
         return raw
@@ -470,73 +474,149 @@ class UrlService(SimpleService):
 
 class SocketService(SimpleService):
     def __init__(self, configuration=None, name=None):
+        self._sock = None
+        self._keep_alive = True
         self.host = "localhost"
         self.port = None
-        self.sock = None
         self.unix_socket = None
         self.request = ""
+        self.__socket_config = None
         SimpleService.__init__(self, configuration=configuration, name=name)
 
-    def _get_raw_data(self):
+    def _connect(self):
         """
-        Get raw data with low-level "socket" module.
-        :return: str
+        Recreate socket and connect to it since they cannot be reused
+        Available configurations are IPv6, IPv4 or UNIX socket
+        :return:
         """
-        if self.sock is None:
-            try:
-                if self.unix_socket is None:
-                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-                    #sock.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1)
-                    #sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-                    #sock.settimeout(self.update_every)
-                    sock.settimeout(0.5)
-                    sock.connect((self.host, self.port))
-                    sock.settimeout(0.5)  # Just to be sure
+        try:
+            if self.unix_socket is None:
+                if self.__socket_config is None:
+                    # establish ipv6 or ipv4 connection.
+                    for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
+                        try:
+                            af, socktype, proto, canonname, sa = res
+                            self._sock = socket.socket(af, socktype, proto)
+                        except socket.error as msg:
+                            self._sock = None
+                            continue
+                        try:
+                            self._sock.connect(sa)
+                        except socket.error as msg:
+                            self._disconnect()
+                            continue
+                        self.__socket_config = res
+                        break
                 else:
-                    sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
-                    #sock.settimeout(self.update_every)
-                    sock.settimeout(0.05)
-                    sock.connect(self.unix_socket)
-                    sock.settimeout(0.05)  # Just to be sure
+                    # connect to socket with previously established configuration
+                    try:
+                        af, socktype, proto, canonname, sa = self.__socket_config
+                        self._sock = socket.socket(af, socktype, proto)
+                        self._sock.connect(sa)
+                    except socket.error as msg:
+                        self._disconnect()
+            else:
+                # connect to unix socket
+                self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+                self._sock.connect(self.unix_socket)
+        except Exception as e:
+            self.error(str(e),
+                       "Cannot create socket with following configuration: host:", str(self.host),
+                       "port:", str(self.port),
+                       "socket:", str(self.unix_socket))
+            self._sock = None
+        self._sock.setblocking(0)
 
-            except Exception as e:
-                self.error(str(e))
-                self.sock = None
-                return None
+    def _disconnect(self):
+        """
+        Close socket connection
+        :return:
+        """
+        try:
+            self._sock.shutdown(2)  # 0 - read, 1 - write, 2 - all
+            self._sock.close()
+        except Exception:
+            pass
+        self._sock = None
 
+    def _send(self):
+        """
+        Send request.
+        :return: boolean
+        """
+        # Send request if it is needed
         if self.request != "".encode():
             try:
-                sock.send(self.request)
+                self._sock.send(self.request)
             except Exception as e:
-                try:
-                    sock.shutdown(1)
-                    sock.close()
-                except:
-                    pass
-                self.sock = None
-                self.error(str(e))
-                return None
+                self._disconnect()
+                self.error(str(e),
+                           "used configuration: host:", str(self.host),
+                           "port:", str(self.port),
+                           "socket:", str(self.unix_socket))
+                return False
+        return True
 
-        size = 2
-        try:
-            data = sock.recv(size).decode()
-        except Exception as e:
-            self.error(str(e))
-            sock.close()
+    def _receive(self):
+        """
+        Receive data from socket
+        :return: str
+        """
+        data = ""
+        while True:
+            try:
+                ready_to_read, _, in_error = select.select([self._sock], [], [], 0.01)
+            except Exception as e:
+                self.debug("SELECT", str(e))
+                self._disconnect()
+                break
+            if len(ready_to_read) > 0:
+                buf = self._sock.recv(4096)
+                if len(buf) == 0 or buf is None:
+                    break
+                data += buf.decode()
+            else:
+                self._disconnect()
+                break
+
+        return data
+
+    def _get_raw_data(self):
+        """
+        Get raw data with low-level "socket" module.
+        :return: str
+        """
+        if self._sock is None:
+            self._connect()
+
+        # Send request if it is needed
+        if not self._send():
             return None
 
-        while True:
-            # implement something like TCP Window Scaling
-            if size < 4096:
-                size *= 2
-            buf = sock.recv(size)
-            data += buf.decode()
-            if len(buf) < size:
+        finished = False
+        data = ""
+        prevent_infinite_loop = 1000000
+        while not finished:
+            data += self._receive()
+            finished = self._check_raw_data(data)
+            prevent_infinite_loop -= 1
+            if prevent_infinite_loop <= 0:
+                self.debug("Almost got into infinite loop while grabbing data. Is _check_raw_data() ok?")
                 break
 
+        if not self._keep_alive:
+            self._disconnect()
+
         return data
 
+    def _check_raw_data(self, data):
+        """
+        Check if all data has been gathered from socket
+        :param data: str
+        :return: boolean
+        """
+        return True
+
     def _parse_config(self):
         """
         Parse configuration data
@@ -549,19 +629,19 @@ class SocketService(SimpleService):
         try:
             self.unix_socket = str(self.configuration['socket'])
         except (KeyError, TypeError):
-            self.error("No unix socket specified. Trying TCP/IP socket.")
+            self.debug("No unix socket specified. Trying TCP/IP socket.")
             try:
                 self.host = str(self.configuration['host'])
             except (KeyError, TypeError):
-                self.error("No host specified. Using: '" + self.host + "'")
+                self.debug("No host specified. Using: '" + self.host + "'")
             try:
                 self.port = int(self.configuration['port'])
             except (KeyError, TypeError):
-                self.error("No port specified. Using: '" + str(self.port) + "'")
+                self.debug("No port specified. Using: '" + str(self.port) + "'")
         try:
             self.request = str(self.configuration['request'])
         except (KeyError, TypeError):
-            self.error("No request specified. Using: '" + str(self.request) + "'")
+            self.debug("No request specified. Using: '" + str(self.request) + "'")
         self.request = self.request.encode()
 
 
@@ -641,7 +721,7 @@ class ExecutableService(SimpleService):
         try:
             p = Popen(self.command, stdout=PIPE, stderr=PIPE)
         except Exception as e:
-            self.error(str(e))
+            self.error("Executing command", self.command, "resulted in error:", str(e))
             return None
         data = []
         for line in p.stdout.readlines():
@@ -686,5 +766,6 @@ class ExecutableService(SimpleService):
                 #    break
 
         if self._get_data() is None or len(self._get_data()) == 0:
+            self.error("Command", self.command, "returned no data")
             return False
         return True