]> arthur.barton.de Git - netdata.git/commitdiff
multithreading
authorpaulfantom <paulfantom@gmail.com>
Thu, 23 Jun 2016 09:21:26 +0000 (11:21 +0200)
committerpaulfantom <paulfantom@gmail.com>
Thu, 23 Jun 2016 09:21:26 +0000 (11:21 +0200)
plugins.d/python.d.plugin
python.d/apache.chart.py
python.d/example.chart.py
python.d/python_modules/base.py
python.d/python_modules/msg.py [new file with mode: 0644]

index e9f1d5de839e01c4a4c59bd5a8efec614949bc9f..8ae26dbbcb63390732ce2be843ea2dc20714aa3b 100755 (executable)
@@ -7,6 +7,7 @@
 import os
 import sys
 import time
+import threading
 
 # -----------------------------------------------------------------------------
 # globals & environment setup
@@ -17,7 +18,7 @@ BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
                'retries': 10}
 
 MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
-                              os.path.dirname(__file__)) + "/../python.d") + "/"
+                                        os.path.dirname(__file__)) + "/../python.d") + "/"
 CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
 # directories should end with '/'
 if CONFIG_DIR[-1] != "/":
@@ -28,60 +29,10 @@ PROGRAM = os.path.basename(__file__).replace(".plugin", "")
 DEBUG_FLAG = False
 OVERRIDE_UPDATE_EVERY = False
 
-
 # -----------------------------------------------------------------------------
-# logging functions
-def log_msg(msg_type, *args):
-    """
-    Print message on stderr.
-    :param msg_type: str
-    """
-    sys.stderr.write(PROGRAM)
-    sys.stderr.write(" ")
-    sys.stderr.write(msg_type)
-    sys.stderr.write(": ")
-    for i in args:
-        sys.stderr.write(" ")
-        sys.stderr.write(str(i))
-    sys.stderr.write("\n")
-    sys.stderr.flush()
-
-
-def debug(*args):
-    """
-    Print debug message on stderr.
-    """
-    if not DEBUG_FLAG:
-        return
-
-    log_msg("DEBUG", *args)
-
-
-def error(*args):
-    """
-    Print message on stderr.
-    """
-    log_msg("ERROR", *args)
-
-
-def info(*args):
-    """
-    Print message on stderr.
-    """
-    log_msg("INFO", *args)
-
-
-def fatal(*args):
-    """
-    Print message on stderr and exit.
-    """
-    log_msg("FATAL", *args)
-    sys.stdout.write('DISABLE\n')
-    sys.exit(1)
+# custom, third party and version specific python modules management
+import msg
 
-
-# -----------------------------------------------------------------------------
-# third party and version specific python modules management
 try:
     assert sys.version_info >= (3, 1)
     import importlib.machinery
@@ -90,7 +41,7 @@ try:
     # import builtins
     # builtins.PY_VERSION = 3
     PY_VERSION = 3
-    info('Using python v3')
+    msg.info('Using python v3')
 except (AssertionError, ImportError):
     try:
         import imp
@@ -99,19 +50,20 @@ except (AssertionError, ImportError):
         # import __builtin__
         # __builtin__.PY_VERSION = 2
         PY_VERSION = 2
-        info('Using python v2')
+        msg.info('Using python v2')
     except ImportError:
-        fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
+        msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
 try:
     import yaml
 except ImportError:
-    fatal('Cannot find yaml library')
+    msg.fatal('Cannot find yaml library')
 
 
 class PythonCharts(object):
     """
     Main class used to control every python module.
     """
+
     def __init__(self,
                  modules=None,
                  modules_path='../python.d/',
@@ -167,7 +119,7 @@ class PythonCharts(object):
             else:
                 return imp.load_source(name, path)
         except Exception as e:
-            error("Problem loading", name, str(e))
+            msg.error("Problem loading", name, str(e))
             return None
 
     def _load_modules(self, path, modules, disabled):
@@ -181,7 +133,7 @@ class PythonCharts(object):
 
         # check if plugin directory exists
         if not os.path.isdir(path):
-            fatal("cannot find charts directory ", path)
+            msg.fatal("cannot find charts directory ", path)
 
         # load modules
         loaded = []
@@ -193,17 +145,17 @@ class PythonCharts(object):
                 if mod is not None:
                     loaded.append(mod)
                 else:  # exit if plugin is not found
-                    fatal('no modules found.')
+                    msg.fatal('no modules found.')
         else:
             # scan directory specified in path and load all modules from there
             names = os.listdir(path)
             for mod in names:
                 if mod.replace(MODULE_EXTENSION, "") in disabled:
-                    error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
+                    msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
                     continue
                 m = self._import_module(path + mod)
                 if m is not None:
-                    debug(mod + ": loading module '" + path + mod + "'")
+                    msg.debug(mod + ": loading module '" + path + mod + "'")
                     loaded.append(m)
         return loaded
 
@@ -218,7 +170,7 @@ class PythonCharts(object):
         for mod in modules:
             configfile = self.configs + mod.__name__ + ".conf"
             if os.path.isfile(configfile):
-                debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
+                msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
                 try:
                     if not hasattr(mod, 'config'):
                         mod.config = {}
@@ -226,12 +178,12 @@ class PythonCharts(object):
                             'config',
                             self._parse_config(mod, read_config(configfile)))
                 except Exception as e:
-                    error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
+                    msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
             else:
-                error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
+                msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
                 # set config if not found
                 if not hasattr(mod, 'config'):
-                    debug(mod.__name__ + ": setting configuration for only one job")
+                    msg.debug(mod.__name__ + ": setting configuration for only one job")
                     mod.config = {None: {}}
                     for var in BASE_CONFIG:
                         try:
@@ -256,7 +208,7 @@ class PythonCharts(object):
         """
         # get default values
         defaults = {}
-        debug(module.__name__ + ": reading configuration")
+        msg.debug(module.__name__ + ": reading configuration")
         for key in BASE_CONFIG:
             try:
                 # get defaults from module config
@@ -305,10 +257,10 @@ class PythonCharts(object):
                 try:
                     job = module.Service(configuration=conf, name=name)
                 except Exception as e:
-                    error(module.__name__ +
-                          ("/" + str(name) if name is not None else "") +
-                          ": cannot start job: '" +
-                          str(e))
+                    msg.error(module.__name__ +
+                              ("/" + str(name) if name is not None else "") +
+                              ": cannot start job: '" +
+                              str(e))
                     return None
                 else:
                     # set chart_name (needed to plot run time graphs)
@@ -316,7 +268,7 @@ class PythonCharts(object):
                     if name is not None:
                         job.chart_name += "_" + name
                 jobs.append(job)
-                debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
+                msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
 
         return [j for j in jobs if j is not None]
 
@@ -336,22 +288,22 @@ class PythonCharts(object):
         if reason is None:
             return
         elif reason[:3] == "no ":
-            error(prefix +
-                  "does not seem to have " +
-                  reason[3:] +
-                  "() function. Disabling it.")
+            msg.error(prefix +
+                      "does not seem to have " +
+                      reason[3:] +
+                      "() function. Disabling it.")
         elif reason[:7] == "failed ":
-            error(prefix +
-                  reason[7:] +
-                  "() function reports failure.")
+            msg.error(prefix +
+                      reason[7:] +
+                      "() function reports failure.")
         elif reason[:13] == "configuration":
-            error(prefix +
-                  "configuration file '" +
-                  self.configs +
-                  job.__module__ +
-                  ".conf' not found. Using defaults.")
+            msg.error(prefix +
+                      "configuration file '" +
+                      self.configs +
+                      job.__module__ +
+                      ".conf' not found. Using defaults.")
         elif reason[:11] == "misbehaving":
-            error(prefix + "is " + reason)
+            msg.error(prefix + "is " + reason)
 
     def check(self):
         """
@@ -366,7 +318,7 @@ class PythonCharts(object):
                 if not job.check():
                     self._stop(job, "failed check")
                 else:
-                    debug(job.chart_name, ": check succeeded")
+                    msg.debug(job.chart_name, ": check succeeded")
                     i += 1
             except AttributeError:
                 self._stop(job, "no check")
@@ -397,7 +349,7 @@ class PythonCharts(object):
                         str(job.timetable['freq']) +
                         '\n')
                     sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
-                    debug("created charts for", job.chart_name)
+                    msg.debug("created charts for", job.chart_name)
                     # sys.stdout.flush()
                     i += 1
             except AttributeError:
@@ -405,82 +357,14 @@ class PythonCharts(object):
             except (UnboundLocalError, Exception) as e:
                 self._stop(job, "misbehaving. Reason: " + str(e))
 
-    def _update_job(self, job):
-        """
-        Tries to execute update() on specified job.
-        This cannot fail thus it is catching every exception.
-        If job.update() returns False, number of retries_left is decremented.
-        If there are no more retries, job is stopped.
-        Job is also stopped if it throws an exception.
-        This is also updating job run time chart.
-        Return False if job is stopped
-        :param job: object
-        :return: boolean
-        """
-        t_start = time.time()
-        # check if it is time to execute job update() function
-        if job.timetable['next'] > t_start:
-            debug(job.chart_name + " will be run in " + str(int((job.timetable['next'] - t_start) * 1000)) + " ms")
-            return True
-        try:
-            if self.first_run:
-                since_last = 0
-            else:
-                since_last = int((t_start - job.timetable['last']) * 1000000)
-                debug(job.chart_name +
-                      " ready to run, after " + str(int((t_start - job.timetable['last']) * 1000)) +
-                      " ms (update_every: " + str(job.timetable['freq'] * 1000) +
-                      " ms, latency: " + str(int((t_start - job.timetable['next']) * 1000)) + " ms)")
-            if not job.update(since_last):
-                if job.retries_left <= 0:
-                    self._stop(job, "update failed")
-                    return False
-                job.retries_left -= 1
-                job.timetable['next'] += job.timetable['freq']
-                return True
-        except AttributeError:
-            self._stop(job, "no update")
-            return False
-        except (UnboundLocalError, Exception) as e:
-            self._stop(job, "misbehaving. Reason: " + str(e))
-            return False
-        t_end = time.time()
-        job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq']
-        # draw performance graph
-        run_time = str(int((t_end - t_start) * 1000))
-        debug(job.chart_name, "updated in", run_time, "ms")
-        sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.chart_name + " " + str(since_last) + '\n')
-        sys.stdout.write("SET run_time = " + run_time + '\n')
-        sys.stdout.write("END\n")
-        # sys.stdout.flush()
-        job.timetable['last'] = t_start
-        job.retries_left = job.retries
-        self.first_run = False
-        return True
-
     def update(self):
-        """
-        Tries to execute update() on every job by using _update_job()
-        This will stay forever and ever and ever forever and ever it'll be the one...
-        """
-        self.first_run = True
+        for job in self.jobs:
+            job.start()
+
         while True:
-            next_runs = []
-            i = 0
-            while i < len(self.jobs):
-                job = self.jobs[i]
-                if self._update_job(job):
-                    try:
-                        next_runs.append(job.timetable['next'])
-                    except KeyError:
-                        pass
-                    i += 1
-            if len(next_runs) == 0:
-                fatal('no python.d modules loaded.')
-            try:
-                time.sleep(min(next_runs) - time.time())
-            except IOError:
-                pass
+            if threading.active_count() <= 1:
+                msg.fatal("no more jobs")
+            time.sleep(1)
 
 
 def read_config(path):
@@ -493,10 +377,10 @@ def read_config(path):
         with open(path, 'r') as stream:
             config = yaml.load(stream)
     except (OSError, IOError):
-        error(str(path), "is not a valid configuration file")
+        msg.error(str(path), "is not a valid configuration file")
         return None
     except yaml.YAMLError as e:
-        error(str(path), "is malformed:", e)
+        msg.error(str(path), "is malformed:", e)
         return None
     return config
 
@@ -531,9 +415,9 @@ def parse_cmdline(directory, *commands):
                 pass
     if changed_update and DEBUG_FLAG:
         OVERRIDE_UPDATE_EVERY = True
-        debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
+        msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
 
-    debug("started from", commands[0], "with options:", *commands[1:])
+    msg.debug("started from", commands[0], "with options:", *commands[1:])
 
     return mods
 
@@ -548,14 +432,14 @@ def run():
     # read configuration file
     disabled = []
     configfile = CONFIG_DIR + "python.d.conf"
-    debug(PROGRAM, "reading configuration file:", configfile)
+    msg.info(PROGRAM, "reading configuration file:", configfile)
 
     conf = read_config(configfile)
     if conf is not None:
         try:
             # exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
             if conf['enabled'] is False:
-                fatal('disabled in configuration file.\n')
+                msg.fatal('disabled in configuration file.\n')
         except (KeyError, TypeError):
             pass
         try:
@@ -573,19 +457,22 @@ def run():
             if v is False:
                 disabled.append(k)
 
+    DEBUG_FLAG = True
+    msg.DEBUG_FLAG = DEBUG_FLAG
+    msg.PROGRAM = PROGRAM
     # parse passed command line arguments
     modules = parse_cmdline(MODULES_DIR, *sys.argv)
-    info("MODULES_DIR='" + MODULES_DIR +
-         "', CONFIG_DIR='" + CONFIG_DIR +
-         "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
-         ", ONLY_MODULES=" + str(modules))
+    msg.info("MODULES_DIR='" + MODULES_DIR +
+             "', CONFIG_DIR='" + CONFIG_DIR +
+             "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
+             ", ONLY_MODULES=" + str(modules))
 
     # run plugins
     charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
     charts.check()
     charts.create()
     charts.update()
-    fatal("finished")
+    msg.fatal("finished")
 
 
 if __name__ == '__main__':
index 94b05713e1eb73524794293bc5a0b5a556db9b9e..f61242ecce16c47a811ff5c1b52981d995a8096f 100644 (file)
@@ -112,8 +112,7 @@ class Service(UrlService):
             if str(tmp[0]) in self.assignment:
                 try:
                     data[self.assignment[tmp[0]]] = int(float(tmp[1]))
-                except (IndexError, ValueError) as a:
-                    print(a)
+                except (IndexError, ValueError):
                     pass
         if len(data) == 0:
             return None
index b36aa7b3fc58f9d8572dc1d988af5790e8028049..18c72032eb5a4930983a9ecce225daebae3308d6 100644 (file)
@@ -13,6 +13,7 @@ update_every = 4
 priority = 90000
 retries = 7
 
+
 class Service(BaseService):
     def __init__(self, configuration=None, name=None):
         super(self.__class__,self).__init__(configuration=configuration, name=name)
@@ -21,15 +22,15 @@ class Service(BaseService):
         return True
     
     def create(self):
-        print("CHART example.python_random '' 'A random number' 'random number' random random line " +
-              str(self.priority) +
-              " " +
-              str(self.update_every))
-        print("DIMENSION random1 '' absolute 1 1")
+        chart = "CHART example.python_random '' 'A random number' 'random number' random random line " + \
+                str(self.priority) + " " + \
+                str(self.update_every) + "\n" + \
+                "DIMENSION random1 '' absolute 1 1"
+        print(chart)
         return True
     
     def update(self, interval):
-        print("BEGIN example.python_random "+str(interval))
-        print("SET random1 = "+str(random.randint(0,100)))
-        print("END")
+        chart = "BEGIN example.python_random "+str(interval)+"\n"
+        chart += "SET random1 = "+str(random.randint(0,100))+"\n"
+        print(chart + "END")
         return True
index 710ead1bd95036e7450beb2f587b6bff99c6897d..2fcd2a7b6ed8f28b94dd6781adf85a0b4c8e61c5 100644 (file)
@@ -2,25 +2,36 @@
 # Description: prototypes for netdata python.d modules
 # Author: Pawel Krupa (paulfantom)
 
-from time import time
+import time
 import sys
 try:
     from urllib.request import urlopen
 except ImportError:
     from urllib2 import urlopen
 
+import threading
+import msg
 
-class BaseService(object):
+
+class BaseService(threading.Thread):
     """
     Prototype of Service class.
     Implemented basic functionality to run jobs by `python.d.plugin`
     """
+    debugging = False
+
     def __init__(self, configuration=None, name=None):
         """
         This needs to be initialized in child classes
         :param configuration: dict
         :param name: str
         """
+        threading.Thread.__init__(self)
+        self.daemon = True
+        self.retries = 0
+        self.retries_left = 0
+        self.priority = 140000
+        self.update_every = 1
         self.name = name
         if configuration is None:
             self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
@@ -58,24 +69,64 @@ class BaseService(object):
         """
         if freq is None:
             freq = self.update_every
-        now = time()
+        now = time.time()
         self.timetable = {'last': now,
                           'next': now - (now % freq) + freq,
                           'freq': freq}
 
-    @staticmethod
-    def error(msg, exception=""):
-        if exception != "":
-            exception = " " + str(exception).replace("\n", " ")
-        sys.stderr.write(str(msg)+exception+"\n")
-        sys.stderr.flush()
+    def _run_once(self):
+        t_start = time.time()
+        # check if it is time to execute job update() function
+        if self.timetable['next'] > t_start:
+            msg.debug(self.chart_name + " will be run in " +
+                      str(int((self.timetable['next'] - t_start) * 1000)) + " ms")
+            return True
+
+        since_last = int((t_start - self.timetable['last']) * 1000000)
+        msg.debug(self.chart_name +
+                  " ready to run, after " + str(int((t_start - self.timetable['last']) * 1000)) +
+                  " ms (update_every: " + str(self.timetable['freq'] * 1000) +
+                  " ms, latency: " + str(int((t_start - self.timetable['next']) * 1000)) + " ms)")
+        if not self.update(since_last):
+            return False
+        t_end = time.time()
+        self.timetable['next'] = t_end - (t_end % self.timetable['freq']) + self.timetable['freq']
+
+        # draw performance graph
+        run_time = str(int((t_end - t_start) * 1000))
+        run_time_chart = "BEGIN netdata.plugin_pythond_" + self.chart_name + " " + str(since_last) + '\n'
+        run_time_chart += "SET run_time = " + run_time + '\n'
+        run_time_chart += "END\n"
+        sys.stdout.write(run_time_chart)
+        msg.debug(self.chart_name + " updated in " + str(run_time) + " ms")
+        self.timetable['last'] = t_start
+        return True
+
+    def run(self):
+        self.timetable['last'] = time.time()
+        while True:
+            try:
+                status = self._run_once()
+            except Exception as e:
+                msg.error("Something wrong: " + str(e))
+                return
+            if status:
+                time.sleep(self.timetable['next'] - time.time())
+                self.retries_left = self.retries
+            else:
+                self.retries -= 1
+                if self.retries_left <= 0:
+                    msg.error("no more retries. Exiting")
+                    return
+                else:
+                    time.sleep(self.timetable['freq'])
 
     def check(self):
         """
         check() prototype
         :return: boolean
         """
-        self.error("Service " + str(self.__module__) + "doesn't implement check() function")
+        msg.error("Service " + str(self.__module__) + "doesn't implement check() function")
         return False
 
     def create(self):
@@ -83,7 +134,7 @@ class BaseService(object):
         create() prototype
         :return: boolean
         """
-        self.error("Service " + str(self.__module__) + "doesn't implement create() function?")
+        msg.error("Service " + str(self.__module__) + "doesn't implement create() function?")
         return False
 
     def update(self, interval):
@@ -92,7 +143,7 @@ class BaseService(object):
         :param interval: int
         :return: boolean
         """
-        self.error("Service " + str(self.__module__) + "doesn't implement update() function")
+        msg.error("Service " + str(self.__module__) + "doesn't implement update() function")
         return False
 
 
@@ -128,7 +179,7 @@ class UrlService(BaseService):
             f = urlopen(self.url, timeout=self.update_every)
             raw = f.read().decode('utf-8')
         except Exception as e:
-            self.error(self.__module__, str(e))
+            msg.error(self.__module__, str(e))
         finally:
             try:
                 f.close()
@@ -148,7 +199,7 @@ class UrlService(BaseService):
         Format configuration data and try to connect to server
         :return: boolean
         """
-        if self.name is None:
+        if self.name is None or self.name == str(None):
             self.name = 'local'
         else:
             self.name = str(self.name)
@@ -193,8 +244,7 @@ class UrlService(BaseService):
                     content += "DIMENSION " + line['name'] + " " + line['options'] + "\n"
 
             if len(content) > 0:
-                print(header)
-                print(content)
+                print(header + "\n" + content)
                 idx += 1
 
         if idx == 0:
@@ -220,7 +270,6 @@ class UrlService(BaseService):
                 except KeyError:
                     pass
             if len(c) != 0:
-                print(header + c)
-                print("END")
+                print(header + c + "\nEND")
 
         return True
diff --git a/python.d/python_modules/msg.py b/python.d/python_modules/msg.py
new file mode 100644 (file)
index 0000000..fbaff21
--- /dev/null
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+# Description: logging for netdata python.d modules
+
+import sys
+
+DEBUG_FLAG = False
+PROGRAM = ""
+
+
+def log_msg(msg_type, *args):
+    """
+    Print message on stderr.
+    :param msg_type: str
+    """
+    sys.stderr.write(PROGRAM)
+    sys.stderr.write(" ")
+    sys.stderr.write(msg_type)
+    sys.stderr.write(": ")
+    for i in args:
+        sys.stderr.write(" ")
+        sys.stderr.write(str(i))
+    sys.stderr.write("\n")
+    sys.stderr.flush()
+
+
+def debug(*args):
+    """
+    Print debug message on stderr.
+    """
+    if not DEBUG_FLAG:
+        return
+
+    log_msg("DEBUG", *args)
+
+
+def error(*args):
+    """
+    Print message on stderr.
+    """
+    log_msg("ERROR", *args)
+
+
+def info(*args):
+    """
+    Print message on stderr.
+    """
+    log_msg("INFO", *args)
+
+
+def fatal(*args):
+    """
+    Print message on stderr and exit.
+    """
+    log_msg("FATAL", *args)
+    sys.stdout.write('DISABLE\n')
+    sys.exit(1)
\ No newline at end of file