]> arthur.barton.de Git - netdata.git/commitdiff
non-blocking `SocketService`
authorpaulfantom <paulfantom@gmail.com>
Mon, 18 Jul 2016 20:56:08 +0000 (22:56 +0200)
committerpaulfantom <paulfantom@gmail.com>
Mon, 18 Jul 2016 20:56:08 +0000 (22:56 +0200)
python.d/python_modules/base.py
python.d/redis.chart.py
python.d/squid.chart.py

index 799ff954e11218e7a87cd52f863a28257642b2c8..a074228b2fde3be78dc681cbca758ce5be312d4d 100644 (file)
@@ -6,6 +6,7 @@ import time
 import sys
 import os
 import socket
+import select
 try:
     import urllib.request as urllib2
 except ImportError:
@@ -479,92 +480,142 @@ class SocketService(SimpleService):
         self.port = None
         self.unix_socket = None
         self.request = ""
+        self.__socket_config = None
         SimpleService.__init__(self, configuration=configuration, name=name)
 
-    def _get_raw_data(self):
+    def _connect(self):
         """
-        Get raw data with low-level "socket" module.
-        :return: str
+        Recreate socket and connect to it since they cannot be reused
+        Available configurations are IPv6, IPv4 or UNIX socket
+        :return:
         """
-        # Recreate socket since they cannot be reused
-        if self._sock is None:
-            try:
-                if self.unix_socket is None:
-                    self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-                    self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-                    #sock.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1)
-                    #sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-                    #sock.settimeout(self.update_every)
-                    self._sock.settimeout(0.5)
-                    self._sock.connect((self.host, self.port))
-                    self._sock.settimeout(0.5)  # Just to be sure
+        try:
+            if self.unix_socket is None:
+                if self.__socket_config is None:
+                    # establish ipv6 or ipv4 connection.
+                    for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
+                        try:
+                            af, socktype, proto, canonname, sa = res
+                            self._sock = socket.socket(af, socktype, proto)
+                        except socket.error as msg:
+                            self._sock = None
+                            continue
+                        try:
+                            self._sock.connect(sa)
+                        except socket.error as msg:
+                            self._disconnect()
+                            continue
+                        self.__socket_config = res
+                        break
                 else:
-                    self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
-                    #self._sock.settimeout(self.update_every)
-                    self._sock.settimeout(0.05)
-                    self._sock.connect(self.unix_socket)
-                    self._sock.settimeout(0.05)  # Just to be sure
-            except Exception as e:
-                self.error(str(e),
-                           "Cannot create socket with following configuration: host:", str(self.host),
-                           "port:", str(self.port),
-                           "socket:", str(self.unix_socket))
-                self._sock = None
-                return None
+                    # connect to socket with previously established configuration
+                    try:
+                        af, socktype, proto, canonname, sa = self.__socket_config
+                        self._sock = socket.socket(af, socktype, proto)
+                        self._sock.connect(sa)
+                    except socket.error as msg:
+                        self._disconnect()
+            else:
+                # connect to unix socket
+                self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+                self._sock.connect(self.unix_socket)
+        except Exception as e:
+            self.error(str(e),
+                       "Cannot create socket with following configuration: host:", str(self.host),
+                       "port:", str(self.port),
+                       "socket:", str(self.unix_socket))
+            self._sock = None
+        self._sock.setblocking(0)
+
+    def _disconnect(self):
+        """
+        Close socket connection
+        :return:
+        """
+        try:
+            self._sock.shutdown(2)  # 0 - read, 1 - write, 2 - all
+            self._sock.close()
+        except Exception:
+            pass
+        self._sock = None
 
+    def _send(self):
+        """
+        Send request.
+        :return: boolean
+        """
         # Send request if it is needed
         if self.request != "".encode():
             try:
                 self._sock.send(self.request)
             except Exception as e:
-                try:
-                    self._sock.shutdown(0)
-                    self._sock.close()
-                    self._sock = None
-                except:
-                    pass
+                self._disconnect()
                 self.error(str(e),
                            "used configuration: host:", str(self.host),
                            "port:", str(self.port),
                            "socket:", str(self.unix_socket))
-                return None
-
-        # Receive first two bytes from socket. This will test if there is anything to receive
-        size = 2
-        try:
-            data = self._sock.recv(size).decode()
-        except Exception as e:
-            self.error(str(e),
-                       "used configuration: host:", str(self.host),
-                       "port:", str(self.port),
-                       "socket:", str(self.unix_socket))
-            self._sock.shutdown(0)
-            self._sock.close()
-            self._sock = None
-            return None
+                return False
+        return True
 
-        # Receive the rest
+    def _receive(self):
+        """
+        Receive data from socket
+        :return: str
+        """
+        data = ""
         while True:
-            # implement something like TCP Window Scaling
-            if size < 4096:
-                size *= 2
-            buf = self._sock.recv(size)
-            data += buf.decode()
-            if len(buf) == 0: break  # precaution
-            if len(buf) < size and not self._more_data_available():
+            try:
+                ready_to_read, _, in_error = select.select([self._sock], [], [], 0.01)
+            except Exception as e:
+                self.debug("SELECT", str(e))
+                self._disconnect()
                 break
+            if len(ready_to_read) > 0:
+                buf = self._sock.recv(4096)
+                if len(buf) == 0 or buf is None:
+                    break
+                data += buf.decode()
             else:
-                size = 4
+                self._disconnect()
+                break
+
+        return data
+
+    def _get_raw_data(self):
+        """
+        Get raw data with low-level "socket" module.
+        :return: str
+        """
+        if self._sock is None:
+            self._connect()
+
+        # Send request if it is needed
+        if not self._send():
+            return None
+
+        finished = False
+        data = ""
+        prevent_infinite_loop = 1000000
+        while not finished:
+            data += self._receive()
+            finished = self._check_raw_data(data)
+            prevent_infinite_loop -= 1
+            if prevent_infinite_loop <= 0:
+                self.debug("Almost got into infinite loop while grabbing data. Is _check_raw_data() ok?")
+                break
 
         if not self._keep_alive:
-            self._sock.shutdown(0)
-            self._sock.close()
-            self._sock = None
+            self._disconnect()
 
         return data
 
-    def _more_data_available(self):
-        return False
+    def _check_raw_data(self, data):
+        """
+        Check if all data has been gathered from socket
+        :param data: str
+        :return: boolean
+        """
+        return True
 
     def _parse_config(self):
         """
index 8268ff14dd4b2bdae8a86386b4f3051f1bbddfbb..f4655ac23c9850c221bb7dd43b15e77e37374e54 100644 (file)
@@ -100,6 +100,24 @@ class Service(SocketService):
         else:
             return data
 
+    def _check_raw_data(self, data):
+        """
+        Check if all data has been gathered from socket.
+        Parse first line containing message length and check against received message
+        :param data: str
+        :return: boolean
+        """
+        length = len(data)
+        supposed = data.split('\n')[0][1:]
+        offset = len(supposed) + 4  # 1 dollar sing, 1 new line character + 1 ending sequence '\r\n'
+        supposed = int(supposed)
+        if length - offset >= supposed:
+            return True
+        else:
+            return False
+
+        return False
+
     def check(self):
         """
         Parse configuration, check if redis is available, and dynamically create chart lines data
@@ -108,7 +126,7 @@ class Service(SocketService):
         self._parse_config()
         if self.name == "":
             self.name = "local"
-        self.chart_name += "_" + self.name
+            self.chart_name += "_" + self.name
         data = self._get_data()
         if data is None:
             return False
index c641f3e86d9fe4efe520e245c2a83635cd0a55ea..8ad9a794693bf7700e3e4b5f2a462d0290ea81b9 100644 (file)
@@ -72,22 +72,30 @@ class Service(SocketService):
             self.error("invalid data received")
             return None
 
+        self.debug("DATA:", str(len(raw)))
         if len(data) == 0:
             self.error("no data received")
             return None
         else:
             return data
 
-    def _more_data_available(self):
-        try:
-            ready_to_read, _, in_error = select.select([self._sock, ], [], [], 0.05)
-        except Exception as e:
-            self.debug("select returned exception", str(e))
-        if len(ready_to_read) > 0:
+    def _check_raw_data(self, data):
+        if "Content-Length" not in data[:1024]:  # assuming headers should be in first 1024 bytes (performance)
+            return True  # "Content-Length" not found, assume everything is ok
+
+        # TODO write some parser of "Content-Length"
+        return True
+        length = len(data)
+
+
+        supposed = 0
+        if length >= supposed:
             return True
         else:
             return False
 
+        return False
+
     def check(self):
         """
         Parse essential configuration, autodetect squid configuration (if needed), and check if data is available