From: paulfantom Date: Thu, 23 Jun 2016 09:21:26 +0000 (+0200) Subject: multithreading X-Git-Tag: v1.3.0~105^2~12 X-Git-Url: https://arthur.barton.de/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=dc36d60a7da55e6c47bf802e3a2df28f7043ad87;p=netdata.git multithreading --- diff --git a/plugins.d/python.d.plugin b/plugins.d/python.d.plugin index e9f1d5de..8ae26dbb 100755 --- a/plugins.d/python.d.plugin +++ b/plugins.d/python.d.plugin @@ -7,6 +7,7 @@ import os import sys import time +import threading # ----------------------------------------------------------------------------- # globals & environment setup @@ -17,7 +18,7 @@ BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1), 'retries': 10} MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR', - os.path.dirname(__file__)) + "/../python.d") + "/" + os.path.dirname(__file__)) + "/../python.d") + "/" CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/") # directories should end with '/' if CONFIG_DIR[-1] != "/": @@ -28,60 +29,10 @@ PROGRAM = os.path.basename(__file__).replace(".plugin", "") DEBUG_FLAG = False OVERRIDE_UPDATE_EVERY = False - # ----------------------------------------------------------------------------- -# logging functions -def log_msg(msg_type, *args): - """ - Print message on stderr. - :param msg_type: str - """ - sys.stderr.write(PROGRAM) - sys.stderr.write(" ") - sys.stderr.write(msg_type) - sys.stderr.write(": ") - for i in args: - sys.stderr.write(" ") - sys.stderr.write(str(i)) - sys.stderr.write("\n") - sys.stderr.flush() - - -def debug(*args): - """ - Print debug message on stderr. - """ - if not DEBUG_FLAG: - return - - log_msg("DEBUG", *args) - - -def error(*args): - """ - Print message on stderr. - """ - log_msg("ERROR", *args) - - -def info(*args): - """ - Print message on stderr. - """ - log_msg("INFO", *args) - - -def fatal(*args): - """ - Print message on stderr and exit. - """ - log_msg("FATAL", *args) - sys.stdout.write('DISABLE\n') - sys.exit(1) +# custom, third party and version specific python modules management +import msg - -# ----------------------------------------------------------------------------- -# third party and version specific python modules management try: assert sys.version_info >= (3, 1) import importlib.machinery @@ -90,7 +41,7 @@ try: # import builtins # builtins.PY_VERSION = 3 PY_VERSION = 3 - info('Using python v3') + msg.info('Using python v3') except (AssertionError, ImportError): try: import imp @@ -99,19 +50,20 @@ except (AssertionError, ImportError): # import __builtin__ # __builtin__.PY_VERSION = 2 PY_VERSION = 2 - info('Using python v2') + msg.info('Using python v2') except ImportError: - fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2') + msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2') try: import yaml except ImportError: - fatal('Cannot find yaml library') + msg.fatal('Cannot find yaml library') class PythonCharts(object): """ Main class used to control every python module. """ + def __init__(self, modules=None, modules_path='../python.d/', @@ -167,7 +119,7 @@ class PythonCharts(object): else: return imp.load_source(name, path) except Exception as e: - error("Problem loading", name, str(e)) + msg.error("Problem loading", name, str(e)) return None def _load_modules(self, path, modules, disabled): @@ -181,7 +133,7 @@ class PythonCharts(object): # check if plugin directory exists if not os.path.isdir(path): - fatal("cannot find charts directory ", path) + msg.fatal("cannot find charts directory ", path) # load modules loaded = [] @@ -193,17 +145,17 @@ class PythonCharts(object): if mod is not None: loaded.append(mod) else: # exit if plugin is not found - fatal('no modules found.') + msg.fatal('no modules found.') else: # scan directory specified in path and load all modules from there names = os.listdir(path) for mod in names: if mod.replace(MODULE_EXTENSION, "") in disabled: - error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, "")) + msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, "")) continue m = self._import_module(path + mod) if m is not None: - debug(mod + ": loading module '" + path + mod + "'") + msg.debug(mod + ": loading module '" + path + mod + "'") loaded.append(m) return loaded @@ -218,7 +170,7 @@ class PythonCharts(object): for mod in modules: configfile = self.configs + mod.__name__ + ".conf" if os.path.isfile(configfile): - debug(mod.__name__ + ": loading module configuration: '" + configfile + "'") + msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'") try: if not hasattr(mod, 'config'): mod.config = {} @@ -226,12 +178,12 @@ class PythonCharts(object): 'config', self._parse_config(mod, read_config(configfile))) except Exception as e: - error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e)) + msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e)) else: - error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.") + msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.") # set config if not found if not hasattr(mod, 'config'): - debug(mod.__name__ + ": setting configuration for only one job") + msg.debug(mod.__name__ + ": setting configuration for only one job") mod.config = {None: {}} for var in BASE_CONFIG: try: @@ -256,7 +208,7 @@ class PythonCharts(object): """ # get default values defaults = {} - debug(module.__name__ + ": reading configuration") + msg.debug(module.__name__ + ": reading configuration") for key in BASE_CONFIG: try: # get defaults from module config @@ -305,10 +257,10 @@ class PythonCharts(object): try: job = module.Service(configuration=conf, name=name) except Exception as e: - error(module.__name__ + - ("/" + str(name) if name is not None else "") + - ": cannot start job: '" + - str(e)) + msg.error(module.__name__ + + ("/" + str(name) if name is not None else "") + + ": cannot start job: '" + + str(e)) return None else: # set chart_name (needed to plot run time graphs) @@ -316,7 +268,7 @@ class PythonCharts(object): if name is not None: job.chart_name += "_" + name jobs.append(job) - debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added") + msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added") return [j for j in jobs if j is not None] @@ -336,22 +288,22 @@ class PythonCharts(object): if reason is None: return elif reason[:3] == "no ": - error(prefix + - "does not seem to have " + - reason[3:] + - "() function. Disabling it.") + msg.error(prefix + + "does not seem to have " + + reason[3:] + + "() function. Disabling it.") elif reason[:7] == "failed ": - error(prefix + - reason[7:] + - "() function reports failure.") + msg.error(prefix + + reason[7:] + + "() function reports failure.") elif reason[:13] == "configuration": - error(prefix + - "configuration file '" + - self.configs + - job.__module__ + - ".conf' not found. Using defaults.") + msg.error(prefix + + "configuration file '" + + self.configs + + job.__module__ + + ".conf' not found. Using defaults.") elif reason[:11] == "misbehaving": - error(prefix + "is " + reason) + msg.error(prefix + "is " + reason) def check(self): """ @@ -366,7 +318,7 @@ class PythonCharts(object): if not job.check(): self._stop(job, "failed check") else: - debug(job.chart_name, ": check succeeded") + msg.debug(job.chart_name, ": check succeeded") i += 1 except AttributeError: self._stop(job, "no check") @@ -397,7 +349,7 @@ class PythonCharts(object): str(job.timetable['freq']) + '\n') sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n") - debug("created charts for", job.chart_name) + msg.debug("created charts for", job.chart_name) # sys.stdout.flush() i += 1 except AttributeError: @@ -405,82 +357,14 @@ class PythonCharts(object): except (UnboundLocalError, Exception) as e: self._stop(job, "misbehaving. Reason: " + str(e)) - def _update_job(self, job): - """ - Tries to execute update() on specified job. - This cannot fail thus it is catching every exception. - If job.update() returns False, number of retries_left is decremented. - If there are no more retries, job is stopped. - Job is also stopped if it throws an exception. - This is also updating job run time chart. - Return False if job is stopped - :param job: object - :return: boolean - """ - t_start = time.time() - # check if it is time to execute job update() function - if job.timetable['next'] > t_start: - debug(job.chart_name + " will be run in " + str(int((job.timetable['next'] - t_start) * 1000)) + " ms") - return True - try: - if self.first_run: - since_last = 0 - else: - since_last = int((t_start - job.timetable['last']) * 1000000) - debug(job.chart_name + - " ready to run, after " + str(int((t_start - job.timetable['last']) * 1000)) + - " ms (update_every: " + str(job.timetable['freq'] * 1000) + - " ms, latency: " + str(int((t_start - job.timetable['next']) * 1000)) + " ms)") - if not job.update(since_last): - if job.retries_left <= 0: - self._stop(job, "update failed") - return False - job.retries_left -= 1 - job.timetable['next'] += job.timetable['freq'] - return True - except AttributeError: - self._stop(job, "no update") - return False - except (UnboundLocalError, Exception) as e: - self._stop(job, "misbehaving. Reason: " + str(e)) - return False - t_end = time.time() - job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq'] - # draw performance graph - run_time = str(int((t_end - t_start) * 1000)) - debug(job.chart_name, "updated in", run_time, "ms") - sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.chart_name + " " + str(since_last) + '\n') - sys.stdout.write("SET run_time = " + run_time + '\n') - sys.stdout.write("END\n") - # sys.stdout.flush() - job.timetable['last'] = t_start - job.retries_left = job.retries - self.first_run = False - return True - def update(self): - """ - Tries to execute update() on every job by using _update_job() - This will stay forever and ever and ever forever and ever it'll be the one... - """ - self.first_run = True + for job in self.jobs: + job.start() + while True: - next_runs = [] - i = 0 - while i < len(self.jobs): - job = self.jobs[i] - if self._update_job(job): - try: - next_runs.append(job.timetable['next']) - except KeyError: - pass - i += 1 - if len(next_runs) == 0: - fatal('no python.d modules loaded.') - try: - time.sleep(min(next_runs) - time.time()) - except IOError: - pass + if threading.active_count() <= 1: + msg.fatal("no more jobs") + time.sleep(1) def read_config(path): @@ -493,10 +377,10 @@ def read_config(path): with open(path, 'r') as stream: config = yaml.load(stream) except (OSError, IOError): - error(str(path), "is not a valid configuration file") + msg.error(str(path), "is not a valid configuration file") return None except yaml.YAMLError as e: - error(str(path), "is malformed:", e) + msg.error(str(path), "is malformed:", e) return None return config @@ -531,9 +415,9 @@ def parse_cmdline(directory, *commands): pass if changed_update and DEBUG_FLAG: OVERRIDE_UPDATE_EVERY = True - debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every'])) + msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every'])) - debug("started from", commands[0], "with options:", *commands[1:]) + msg.debug("started from", commands[0], "with options:", *commands[1:]) return mods @@ -548,14 +432,14 @@ def run(): # read configuration file disabled = [] configfile = CONFIG_DIR + "python.d.conf" - debug(PROGRAM, "reading configuration file:", configfile) + msg.info(PROGRAM, "reading configuration file:", configfile) conf = read_config(configfile) if conf is not None: try: # exit the whole plugin when 'enabled: no' is set in 'python.d.conf' if conf['enabled'] is False: - fatal('disabled in configuration file.\n') + msg.fatal('disabled in configuration file.\n') except (KeyError, TypeError): pass try: @@ -573,19 +457,22 @@ def run(): if v is False: disabled.append(k) + DEBUG_FLAG = True + msg.DEBUG_FLAG = DEBUG_FLAG + msg.PROGRAM = PROGRAM # parse passed command line arguments modules = parse_cmdline(MODULES_DIR, *sys.argv) - info("MODULES_DIR='" + MODULES_DIR + - "', CONFIG_DIR='" + CONFIG_DIR + - "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) + - ", ONLY_MODULES=" + str(modules)) + msg.info("MODULES_DIR='" + MODULES_DIR + + "', CONFIG_DIR='" + CONFIG_DIR + + "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) + + ", ONLY_MODULES=" + str(modules)) # run plugins charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled) charts.check() charts.create() charts.update() - fatal("finished") + msg.fatal("finished") if __name__ == '__main__': diff --git a/python.d/apache.chart.py b/python.d/apache.chart.py index 94b05713..f61242ec 100644 --- a/python.d/apache.chart.py +++ b/python.d/apache.chart.py @@ -112,8 +112,7 @@ class Service(UrlService): if str(tmp[0]) in self.assignment: try: data[self.assignment[tmp[0]]] = int(float(tmp[1])) - except (IndexError, ValueError) as a: - print(a) + except (IndexError, ValueError): pass if len(data) == 0: return None diff --git a/python.d/example.chart.py b/python.d/example.chart.py index b36aa7b3..18c72032 100644 --- a/python.d/example.chart.py +++ b/python.d/example.chart.py @@ -13,6 +13,7 @@ update_every = 4 priority = 90000 retries = 7 + class Service(BaseService): def __init__(self, configuration=None, name=None): super(self.__class__,self).__init__(configuration=configuration, name=name) @@ -21,15 +22,15 @@ class Service(BaseService): return True def create(self): - print("CHART example.python_random '' 'A random number' 'random number' random random line " + - str(self.priority) + - " " + - str(self.update_every)) - print("DIMENSION random1 '' absolute 1 1") + chart = "CHART example.python_random '' 'A random number' 'random number' random random line " + \ + str(self.priority) + " " + \ + str(self.update_every) + "\n" + \ + "DIMENSION random1 '' absolute 1 1" + print(chart) return True def update(self, interval): - print("BEGIN example.python_random "+str(interval)) - print("SET random1 = "+str(random.randint(0,100))) - print("END") + chart = "BEGIN example.python_random "+str(interval)+"\n" + chart += "SET random1 = "+str(random.randint(0,100))+"\n" + print(chart + "END") return True diff --git a/python.d/python_modules/base.py b/python.d/python_modules/base.py index 710ead1b..2fcd2a7b 100644 --- a/python.d/python_modules/base.py +++ b/python.d/python_modules/base.py @@ -2,25 +2,36 @@ # Description: prototypes for netdata python.d modules # Author: Pawel Krupa (paulfantom) -from time import time +import time import sys try: from urllib.request import urlopen except ImportError: from urllib2 import urlopen +import threading +import msg -class BaseService(object): + +class BaseService(threading.Thread): """ Prototype of Service class. Implemented basic functionality to run jobs by `python.d.plugin` """ + debugging = False + def __init__(self, configuration=None, name=None): """ This needs to be initialized in child classes :param configuration: dict :param name: str """ + threading.Thread.__init__(self) + self.daemon = True + self.retries = 0 + self.retries_left = 0 + self.priority = 140000 + self.update_every = 1 self.name = name if configuration is None: self.error("BaseService: no configuration parameters supplied. Cannot create Service.") @@ -58,24 +69,64 @@ class BaseService(object): """ if freq is None: freq = self.update_every - now = time() + now = time.time() self.timetable = {'last': now, 'next': now - (now % freq) + freq, 'freq': freq} - @staticmethod - def error(msg, exception=""): - if exception != "": - exception = " " + str(exception).replace("\n", " ") - sys.stderr.write(str(msg)+exception+"\n") - sys.stderr.flush() + def _run_once(self): + t_start = time.time() + # check if it is time to execute job update() function + if self.timetable['next'] > t_start: + msg.debug(self.chart_name + " will be run in " + + str(int((self.timetable['next'] - t_start) * 1000)) + " ms") + return True + + since_last = int((t_start - self.timetable['last']) * 1000000) + msg.debug(self.chart_name + + " ready to run, after " + str(int((t_start - self.timetable['last']) * 1000)) + + " ms (update_every: " + str(self.timetable['freq'] * 1000) + + " ms, latency: " + str(int((t_start - self.timetable['next']) * 1000)) + " ms)") + if not self.update(since_last): + return False + t_end = time.time() + self.timetable['next'] = t_end - (t_end % self.timetable['freq']) + self.timetable['freq'] + + # draw performance graph + run_time = str(int((t_end - t_start) * 1000)) + run_time_chart = "BEGIN netdata.plugin_pythond_" + self.chart_name + " " + str(since_last) + '\n' + run_time_chart += "SET run_time = " + run_time + '\n' + run_time_chart += "END\n" + sys.stdout.write(run_time_chart) + msg.debug(self.chart_name + " updated in " + str(run_time) + " ms") + self.timetable['last'] = t_start + return True + + def run(self): + self.timetable['last'] = time.time() + while True: + try: + status = self._run_once() + except Exception as e: + msg.error("Something wrong: " + str(e)) + return + if status: + time.sleep(self.timetable['next'] - time.time()) + self.retries_left = self.retries + else: + self.retries -= 1 + if self.retries_left <= 0: + msg.error("no more retries. Exiting") + return + else: + time.sleep(self.timetable['freq']) def check(self): """ check() prototype :return: boolean """ - self.error("Service " + str(self.__module__) + "doesn't implement check() function") + msg.error("Service " + str(self.__module__) + "doesn't implement check() function") return False def create(self): @@ -83,7 +134,7 @@ class BaseService(object): create() prototype :return: boolean """ - self.error("Service " + str(self.__module__) + "doesn't implement create() function?") + msg.error("Service " + str(self.__module__) + "doesn't implement create() function?") return False def update(self, interval): @@ -92,7 +143,7 @@ class BaseService(object): :param interval: int :return: boolean """ - self.error("Service " + str(self.__module__) + "doesn't implement update() function") + msg.error("Service " + str(self.__module__) + "doesn't implement update() function") return False @@ -128,7 +179,7 @@ class UrlService(BaseService): f = urlopen(self.url, timeout=self.update_every) raw = f.read().decode('utf-8') except Exception as e: - self.error(self.__module__, str(e)) + msg.error(self.__module__, str(e)) finally: try: f.close() @@ -148,7 +199,7 @@ class UrlService(BaseService): Format configuration data and try to connect to server :return: boolean """ - if self.name is None: + if self.name is None or self.name == str(None): self.name = 'local' else: self.name = str(self.name) @@ -193,8 +244,7 @@ class UrlService(BaseService): content += "DIMENSION " + line['name'] + " " + line['options'] + "\n" if len(content) > 0: - print(header) - print(content) + print(header + "\n" + content) idx += 1 if idx == 0: @@ -220,7 +270,6 @@ class UrlService(BaseService): except KeyError: pass if len(c) != 0: - print(header + c) - print("END") + print(header + c + "\nEND") return True diff --git a/python.d/python_modules/msg.py b/python.d/python_modules/msg.py new file mode 100644 index 00000000..fbaff211 --- /dev/null +++ b/python.d/python_modules/msg.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# Description: logging for netdata python.d modules + +import sys + +DEBUG_FLAG = False +PROGRAM = "" + + +def log_msg(msg_type, *args): + """ + Print message on stderr. + :param msg_type: str + """ + sys.stderr.write(PROGRAM) + sys.stderr.write(" ") + sys.stderr.write(msg_type) + sys.stderr.write(": ") + for i in args: + sys.stderr.write(" ") + sys.stderr.write(str(i)) + sys.stderr.write("\n") + sys.stderr.flush() + + +def debug(*args): + """ + Print debug message on stderr. + """ + if not DEBUG_FLAG: + return + + log_msg("DEBUG", *args) + + +def error(*args): + """ + Print message on stderr. + """ + log_msg("ERROR", *args) + + +def info(*args): + """ + Print message on stderr. + """ + log_msg("INFO", *args) + + +def fatal(*args): + """ + Print message on stderr and exit. + """ + log_msg("FATAL", *args) + sys.stdout.write('DISABLE\n') + sys.exit(1) \ No newline at end of file