From 7c78678d6f065d874b8c37b64aeb75f37ba4a2d9 Mon Sep 17 00:00:00 2001 From: paulfantom Date: Mon, 18 Jul 2016 22:56:08 +0200 Subject: [PATCH] non-blocking `SocketService` --- python.d/python_modules/base.py | 175 +++++++++++++++++++++----------- python.d/redis.chart.py | 20 +++- python.d/squid.chart.py | 20 ++-- 3 files changed, 146 insertions(+), 69 deletions(-) diff --git a/python.d/python_modules/base.py b/python.d/python_modules/base.py index 799ff954..a074228b 100644 --- a/python.d/python_modules/base.py +++ b/python.d/python_modules/base.py @@ -6,6 +6,7 @@ import time import sys import os import socket +import select try: import urllib.request as urllib2 except ImportError: @@ -479,92 +480,142 @@ class SocketService(SimpleService): self.port = None self.unix_socket = None self.request = "" + self.__socket_config = None SimpleService.__init__(self, configuration=configuration, name=name) - def _get_raw_data(self): + def _connect(self): """ - Get raw data with low-level "socket" module. - :return: str + Recreate socket and connect to it since they cannot be reused + Available configurations are IPv6, IPv4 or UNIX socket + :return: """ - # Recreate socket since they cannot be reused - if self._sock is None: - try: - if self.unix_socket is None: - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - #sock.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1) - #sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - #sock.settimeout(self.update_every) - self._sock.settimeout(0.5) - self._sock.connect((self.host, self.port)) - self._sock.settimeout(0.5) # Just to be sure + try: + if self.unix_socket is None: + if self.__socket_config is None: + # establish ipv6 or ipv4 connection. + for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM): + try: + af, socktype, proto, canonname, sa = res + self._sock = socket.socket(af, socktype, proto) + except socket.error as msg: + self._sock = None + continue + try: + self._sock.connect(sa) + except socket.error as msg: + self._disconnect() + continue + self.__socket_config = res + break else: - self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - #self._sock.settimeout(self.update_every) - self._sock.settimeout(0.05) - self._sock.connect(self.unix_socket) - self._sock.settimeout(0.05) # Just to be sure - except Exception as e: - self.error(str(e), - "Cannot create socket with following configuration: host:", str(self.host), - "port:", str(self.port), - "socket:", str(self.unix_socket)) - self._sock = None - return None + # connect to socket with previously established configuration + try: + af, socktype, proto, canonname, sa = self.__socket_config + self._sock = socket.socket(af, socktype, proto) + self._sock.connect(sa) + except socket.error as msg: + self._disconnect() + else: + # connect to unix socket + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + self._sock.connect(self.unix_socket) + except Exception as e: + self.error(str(e), + "Cannot create socket with following configuration: host:", str(self.host), + "port:", str(self.port), + "socket:", str(self.unix_socket)) + self._sock = None + self._sock.setblocking(0) + + def _disconnect(self): + """ + Close socket connection + :return: + """ + try: + self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all + self._sock.close() + except Exception: + pass + self._sock = None + def _send(self): + """ + Send request. + :return: boolean + """ # Send request if it is needed if self.request != "".encode(): try: self._sock.send(self.request) except Exception as e: - try: - self._sock.shutdown(0) - self._sock.close() - self._sock = None - except: - pass + self._disconnect() self.error(str(e), "used configuration: host:", str(self.host), "port:", str(self.port), "socket:", str(self.unix_socket)) - return None - - # Receive first two bytes from socket. This will test if there is anything to receive - size = 2 - try: - data = self._sock.recv(size).decode() - except Exception as e: - self.error(str(e), - "used configuration: host:", str(self.host), - "port:", str(self.port), - "socket:", str(self.unix_socket)) - self._sock.shutdown(0) - self._sock.close() - self._sock = None - return None + return False + return True - # Receive the rest + def _receive(self): + """ + Receive data from socket + :return: str + """ + data = "" while True: - # implement something like TCP Window Scaling - if size < 4096: - size *= 2 - buf = self._sock.recv(size) - data += buf.decode() - if len(buf) == 0: break # precaution - if len(buf) < size and not self._more_data_available(): + try: + ready_to_read, _, in_error = select.select([self._sock], [], [], 0.01) + except Exception as e: + self.debug("SELECT", str(e)) + self._disconnect() break + if len(ready_to_read) > 0: + buf = self._sock.recv(4096) + if len(buf) == 0 or buf is None: + break + data += buf.decode() else: - size = 4 + self._disconnect() + break + + return data + + def _get_raw_data(self): + """ + Get raw data with low-level "socket" module. + :return: str + """ + if self._sock is None: + self._connect() + + # Send request if it is needed + if not self._send(): + return None + + finished = False + data = "" + prevent_infinite_loop = 1000000 + while not finished: + data += self._receive() + finished = self._check_raw_data(data) + prevent_infinite_loop -= 1 + if prevent_infinite_loop <= 0: + self.debug("Almost got into infinite loop while grabbing data. Is _check_raw_data() ok?") + break if not self._keep_alive: - self._sock.shutdown(0) - self._sock.close() - self._sock = None + self._disconnect() return data - def _more_data_available(self): - return False + def _check_raw_data(self, data): + """ + Check if all data has been gathered from socket + :param data: str + :return: boolean + """ + return True def _parse_config(self): """ diff --git a/python.d/redis.chart.py b/python.d/redis.chart.py index 8268ff14..f4655ac2 100644 --- a/python.d/redis.chart.py +++ b/python.d/redis.chart.py @@ -100,6 +100,24 @@ class Service(SocketService): else: return data + def _check_raw_data(self, data): + """ + Check if all data has been gathered from socket. + Parse first line containing message length and check against received message + :param data: str + :return: boolean + """ + length = len(data) + supposed = data.split('\n')[0][1:] + offset = len(supposed) + 4 # 1 dollar sing, 1 new line character + 1 ending sequence '\r\n' + supposed = int(supposed) + if length - offset >= supposed: + return True + else: + return False + + return False + def check(self): """ Parse configuration, check if redis is available, and dynamically create chart lines data @@ -108,7 +126,7 @@ class Service(SocketService): self._parse_config() if self.name == "": self.name = "local" - self.chart_name += "_" + self.name + self.chart_name += "_" + self.name data = self._get_data() if data is None: return False diff --git a/python.d/squid.chart.py b/python.d/squid.chart.py index c641f3e8..8ad9a794 100644 --- a/python.d/squid.chart.py +++ b/python.d/squid.chart.py @@ -72,22 +72,30 @@ class Service(SocketService): self.error("invalid data received") return None + self.debug("DATA:", str(len(raw))) if len(data) == 0: self.error("no data received") return None else: return data - def _more_data_available(self): - try: - ready_to_read, _, in_error = select.select([self._sock, ], [], [], 0.05) - except Exception as e: - self.debug("select returned exception", str(e)) - if len(ready_to_read) > 0: + def _check_raw_data(self, data): + if "Content-Length" not in data[:1024]: # assuming headers should be in first 1024 bytes (performance) + return True # "Content-Length" not found, assume everything is ok + + # TODO write some parser of "Content-Length" + return True + length = len(data) + + + supposed = 0 + if length >= supposed: return True else: return False + return False + def check(self): """ Parse essential configuration, autodetect squid configuration (if needed), and check if data is available -- 2.39.2