]> arthur.barton.de Git - netdata.git/commitdiff
Merge pull request #1940 from l2isbad/elasticsearch_plugin_fixes
authorCosta Tsaousis <costa@tsaousis.gr>
Tue, 14 Mar 2017 10:11:05 +0000 (12:11 +0200)
committerGitHub <noreply@github.com>
Tue, 14 Mar 2017 10:11:05 +0000 (12:11 +0200)
Elasticsearch plugin fixes

python.d/elasticsearch.chart.py
python.d/python_modules/base.py

index de74a395ad1a3a43bab66fd82a00721843830fef..430227f69b0b3daf28e21c22f9312b90d1e8b442 100644 (file)
@@ -3,14 +3,14 @@
 # Author: l2isbad
 
 from base import UrlService
-from requests import get
-from socket import gethostbyname
+from socket import gethostbyname, gaierror
 try:
         from queue import Queue
 except ImportError:
         from Queue import Queue
 from threading import Thread
 from collections import namedtuple
+from json import loads
 
 # default module values (can be overridden per job in `config`)
 # update_every = 2
@@ -294,30 +294,33 @@ class Service(UrlService):
         self.order = ORDER
         self.definitions = CHARTS
         self.host = self.configuration.get('host')
-        self.port = self.configuration.get('port')
-        self.user = self.configuration.get('user')
-        self.password = self.configuration.get('pass')
+        self.port = self.configuration.get('port', 9200)
+        self.scheme = self.configuration.get('scheme', 'http')
         self.latency = dict()
         self.methods = list()
-        self.auth = self.user and self.password
 
     def check(self):
         # We can't start if <host> AND <port> not specified
-        if not all([self.host, self.port]):
+        if not all([self.host, self.port, isinstance(self.host, str), isinstance(self.port, (str, int))]):
+            self.error('Host is not defined in the module configuration file')
             return False
 
         # It as a bad idea to use hostname.
         # Hostname -> ip address
         try:
             self.host = gethostbyname(self.host)
-        except Exception as error:
+        except gaierror as error:
             self.error(str(error))
             return False
 
+        scheme = 'http' if self.scheme else 'https'
+        # Add handlers (auth, self signed cert accept)
+        self.url = '%s://%s:%s' % (scheme, self.host, self.port)
+        self._UrlService__add_openers()
         # Create URL for every Elasticsearch API
-        url_node_stats = 'http://%s:%s/_nodes/_local/stats' % (self.host, self.port)
-        url_cluster_health = 'http://%s:%s/_cluster/health' % (self.host, self.port)
-        url_cluster_stats = 'http://%s:%s/_cluster/stats' % (self.host, self.port)
+        url_node_stats = '%s://%s:%s/_nodes/_local/stats' % (scheme, self.host, self.port)
+        url_cluster_health = '%s://%s:%s/_cluster/health' % (scheme, self.host, self.port)
+        url_cluster_stats = '%s://%s:%s/_cluster/stats' % (scheme, self.host, self.port)
 
         # Create list of enabled API calls
         user_choice = [bool(self.configuration.get('node_stats', True)),
@@ -329,13 +332,15 @@ class Service(UrlService):
                          METHODS(get_data_function=self._get_cluster_stats_, url=url_cluster_stats)]
 
         # Remove disabled API calls from 'avail methods'
-        self.methods = [avail_methods[_] for _ in range(len(avail_methods)) if user_choice[_]]
+        self.methods = [avail_methods[e[0]] for e in enumerate(avail_methods) if user_choice[e[0]]]
 
         # Run _get_data for ALL active API calls. 
         api_check_result = dict()
+        data_from_check = dict()
         for method in self.methods:
             try:
-                api_check_result[method.url] = (bool(method.get_data_function(None, method.url)))
+                api_check_result[method.url] = method.get_data_function(None, method.url)
+                data_from_check.update(api_check_result[method.url] or dict())
             except KeyError as error:
                 self.error('Failed to parse %s. Error: %s'  % (method.url, str(error)))
                 return False
@@ -343,25 +348,11 @@ class Service(UrlService):
         # We can start ONLY if all active API calls returned NOT None
         if not all(api_check_result.values()):
             self.error('Plugin could not get data from all APIs')
-            self.error('%s' % api_check_result)
             return False
         else:
-            self.info('%s' % api_check_result)
-            self.info('Plugin was started successfully')
-
+            self._data_from_check = data_from_check
             return True
 
-    def _get_raw_data(self, url):
-        try:
-            if not self.auth:
-                raw_data = get(url)
-            else:
-                raw_data = get(url, auth=(self.user, self.password))
-        except Exception:
-            return None
-
-        return raw_data
-
     def _get_data(self):
         threads = list()
         queue = Queue()
@@ -384,12 +375,12 @@ class Service(UrlService):
         :return: dict
         """
 
-        data = self._get_raw_data(url)
+        raw_data = self._get_raw_data(url)
 
-        if not data:
+        if not raw_data:
             return queue.put(dict()) if queue else None
         else:
-            data = data.json() if hasattr(data.json, '__call__') else data.json
+            data = loads(raw_data)
 
             to_netdata = fetch_data_(raw_data=data, metrics_list=HEALTH_STATS)
 
@@ -406,12 +397,12 @@ class Service(UrlService):
         :return: dict
         """
 
-        data = self._get_raw_data(url)
+        raw_data = self._get_raw_data(url)
 
-        if not data:
+        if not raw_data:
             return queue.put(dict()) if queue else None
         else:
-            data = data.json() if hasattr(data.json, '__call__') else data.json
+            data = loads(raw_data)
 
             to_netdata = fetch_data_(raw_data=data, metrics_list=CLUSTER_STATS)
 
@@ -423,12 +414,12 @@ class Service(UrlService):
         :return: dict
         """
 
-        data = self._get_raw_data(url)
+        raw_data = self._get_raw_data(url)
 
-        if not data:
+        if not raw_data:
             return queue.put(dict()) if queue else None
         else:
-            data = data.json() if hasattr(data.json, '__call__') else data.json
+            data = loads(raw_data)
 
             node = list(data['nodes'].keys())[0]
             to_netdata = fetch_data_(raw_data=data['nodes'][node], metrics_list=NODE_STATS)
index 5d14dc169399a3e227329ca1acbe64f7ecc52161..859300ecae8d704d6cdc9bae7f31fffd91982a95 100644 (file)
@@ -27,6 +27,11 @@ import ssl
 from subprocess import Popen, PIPE
 from sys import exc_info
 
+try:
+    from urlparse import urlparse
+except ImportError:
+    from urllib.parse import urlparse
+
 try:
     import urllib.request as urllib2
 except ImportError:
@@ -462,109 +467,78 @@ class SimpleService(threading.Thread):
 
 
 class UrlService(SimpleService):
-    # TODO add support for https connections
     def __init__(self, configuration=None, name=None):
-        self.url = ""
-        self.user = None
-        self.password = None
-        self.proxies = {}
         SimpleService.__init__(self, configuration=configuration, name=name)
+        self.url = self.configuration.get('url')
+        self.user = self.configuration.get('user')
+        self.password = self.configuration.get('pass')
+        self.ss_cert = self.configuration.get('ss_cert')
 
     def __add_openers(self):
-        # TODO add error handling
-        if self.ss_cert:
-            try:
-                ctx = ssl.create_default_context()
-                ctx.check_hostname = False
-                ctx.verify_mode = ssl.CERT_NONE
-                self.opener = urllib2.build_opener(urllib2.HTTPSHandler(context=ctx))
-            except Exception as error:
-                self.error(str(error))
-                self.opener = urllib2.build_opener()
-        else:
-            self.opener = urllib2.build_opener()
-
-        # Proxy handling
-        # TODO currently self.proxies isn't parsed from configuration file
-        # if len(self.proxies) > 0:
-        #     for proxy in self.proxies:
-        #         url = proxy['url']
-        #         # TODO test this:
-        #         if "user" in proxy and "pass" in proxy:
-        #             if url.lower().startswith('https://'):
-        #                 url = 'https://' + proxy['user'] + ':' + proxy['pass'] + '@' + url[8:]
-        #             else:
-        #                 url = 'http://' + proxy['user'] + ':' + proxy['pass'] + '@' + url[7:]
-        #         # FIXME move proxy auth to sth like this:
-        #         #     passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
-        #         #     passman.add_password(None, url, proxy['user'], proxy['password'])
-        #         #     opener.add_handler(urllib2.HTTPBasicAuthHandler(passman))
-        #
-        #         if url.lower().startswith('https://'):
-        #             opener.add_handler(urllib2.ProxyHandler({'https': url}))
-        #         else:
-        #             opener.add_handler(urllib2.ProxyHandler({'https': url}))
+        def self_signed_cert(ss_cert):
+            if ss_cert:
+                try:
+                    ctx = ssl.create_default_context()
+                    ctx.check_hostname = False
+                    ctx.verify_mode = ssl.CERT_NONE
+                    return urllib2.build_opener(urllib2.HTTPSHandler(context=ctx))
+                except AttributeError:
+                    return None
+            else:
+                return None
+
+        self.opener = self_signed_cert(self.ss_cert) or urllib2.build_opener()
 
         # HTTP Basic Auth
-        if self.user is not None and self.password is not None:
+        if self.user and self.password:
+            url_parse = urlparse(self.url)
+            top_level_url = '://'.join([url_parse.scheme, url_parse.netloc])
             passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
-            passman.add_password(None, self.url, self.user, self.password)
+            passman.add_password(None, top_level_url, self.user, self.password)
             self.opener.add_handler(urllib2.HTTPBasicAuthHandler(passman))
             self.debug("Enabling HTTP basic auth")
 
-        #urllib2.install_opener(opener)
-
-    def _get_raw_data(self):
+    def _get_raw_data(self, custom_url=None):
         """
         Get raw data from http request
         :return: str
         """
-        raw = None
+        raw_data = None
+        f = None
         try:
-            f = self.opener.open(self.url, timeout=self.update_every * 2)
-            # f = urllib2.urlopen(self.url, timeout=self.update_every * 2)
-        except Exception as e:
-            self.error(str(e))
+            f = self.opener.open(custom_url or self.url, timeout=self.update_every * 2)
+            raw_data = f.read().decode('utf-8', 'ignore')
+        except Exception as error:
+            self.error('Url: %s. Error: %s' %(custom_url or self.url, str(error)))
             return None
-
-        try:
-            raw = f.read().decode('utf-8', 'ignore')
-        except Exception as e:
-            self.error(str(e))
         finally:
-            f.close()
-        return raw
+            if f is not None: f.close()
+
+        return raw_data or None
 
     def check(self):
         """
         Format configuration data and try to connect to server
         :return: boolean
         """
-        if self.name is None or self.name == str(None):
-            self.name = 'local'
-            self.chart_name += "_" + self.name
-        else:
-            self.name = str(self.name)
-        try:
-            self.url = str(self.configuration['url'])
-        except (KeyError, TypeError):
-            pass
-        try:
-            self.user = str(self.configuration['user'])
-        except (KeyError, TypeError):
-            pass
-        try:
-            self.password = str(self.configuration['pass'])
-        except (KeyError, TypeError):
-            pass
-        self.ss_cert = self.configuration.get('ss_cert')
+        if not (self.url and isinstance(self.url, str)):
+            self.error('URL is not defined or type is not <str>')
+            return False
+
         self.__add_openers()
 
-        test = self._get_data()
-        if test is None or len(test) == 0:
+        try:
+            data = self._get_data()
+        except Exception as error:
+            self.error('_get_data() failed. Url: %s. Error: %s' % (self.url, error))
             return False
-        else:
+
+        if isinstance(data, dict) and data:
+            self._data_from_check = data
             return True
+        else:
+            self.error("_get_data() returned no data or type is not <dict>")
+            return False
 
 
 class SocketService(SimpleService):