X-Git-Url: https://arthur.barton.de/gitweb/?a=blobdiff_plain;f=plugins.d%2Fpython.d.plugin;h=44b729094ce307189f3fcecbdfc32464c37d1b60;hb=9e06977a81b3a2204625588c74f2fff58d70356d;hp=76c2bb11a5c546ac07e34503d0560fe909c0dd85;hpb=84468ca7b52cb4f0a87bca3ba608191e2f43a81a;p=netdata.git diff --git a/plugins.d/python.d.plugin b/plugins.d/python.d.plugin index 76c2bb11..44b72909 100755 --- a/plugins.d/python.d.plugin +++ b/plugins.d/python.d.plugin @@ -1,4 +1,5 @@ -#!/usr/bin/env python +#!/usr/bin/env bash +'''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' # -*- coding: utf-8 -*- # Description: netdata python modules supervisor @@ -7,92 +8,42 @@ import os import sys import time - - -MODULE_EXTENSION = ".chart.py" - -# ----------------------------------------------------------------------------- -# logging functions - -PROGRAM = os.path.basename(__file__).replace(".plugin", "") -DEBUG_FLAG = False - - -def debug(*args): - """ - Print message on stderr. - """ - if not DEBUG_FLAG: - return - sys.stderr.write(PROGRAM + " DEBUG :") - for i in args: - sys.stderr.write(" " + str(i)) - sys.stderr.write("\n") - sys.stderr.flush() - - -def error(*args): - """ - Print message on stderr. - """ - sys.stderr.write(PROGRAM + " ERROR :") - for i in args: - sys.stderr.write(" " + str(i)) - sys.stderr.write("\n") - sys.stderr.flush() - - -def info(*args): - """ - Print message on stderr. - """ - sys.stderr.write(PROGRAM + " INFO :") - for i in args: - sys.stderr.write(" " + str(i)) - sys.stderr.write("\n") - sys.stderr.flush() - - -def fatal(*args): - """ - Print message on stderr and exit. - """ - sys.stderr.write(PROGRAM + " FATAL :") - for i in args: - sys.stderr.write(" " + str(i)) - sys.stderr.write("\n") - sys.stderr.flush() - sys.stdout.write('DISABLE\n') - sys.exit(1) - +import threading +from re import sub # ----------------------------------------------------------------------------- -# globals & python modules management - -# setup environment +# globals & environment setup # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables +MODULE_EXTENSION = ".chart.py" +BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1), + 'priority': 90000, + 'retries': 10} + MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR', - os.path.dirname(__file__)) + "/../python.d") + "/" + os.path.dirname(__file__)) + "/../python.d") + "/" CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/") -UPDATE_EVERY = os.getenv('NETDATA_UPDATE_EVERY', 1) # directories should end with '/' if CONFIG_DIR[-1] != "/": CONFIG_DIR += "/" sys.path.append(MODULES_DIR + "python_modules") -BASE_CONFIG = {'update_every': UPDATE_EVERY, - 'priority': 90000, - 'retries': 10} +PROGRAM = os.path.basename(__file__).replace(".plugin", "") +DEBUG_FLAG = False +TRACE_FLAG = False +OVERRIDE_UPDATE_EVERY = False + +# ----------------------------------------------------------------------------- +# custom, third party and version specific python modules management +import msg try: assert sys.version_info >= (3, 1) import importlib.machinery - + PY_VERSION = 3 # change this hack below if we want PY_VERSION to be used in modules # import builtins # builtins.PY_VERSION = 3 - PY_VERSION = 3 - info('Using python v3') + msg.info('Using python v3') except (AssertionError, ImportError): try: import imp @@ -101,27 +52,53 @@ except (AssertionError, ImportError): # import __builtin__ # __builtin__.PY_VERSION = 2 PY_VERSION = 2 - info('Using python v2') + msg.info('Using python v2') except ImportError: - fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2') + msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2') +# try: +# import yaml +# except ImportError: +# msg.fatal('Cannot find yaml library') try: - import yaml + if PY_VERSION == 3: + import pyyaml3 as yaml + else: + import pyyaml2 as yaml except ImportError: - fatal('Cannot find yaml library') + msg.fatal('Cannot find yaml library') +try: + from collections import OrderedDict + ORDERED = True + DICT = OrderedDict + msg.info('YAML output is ordered') +except ImportError: + ORDERED = False + DICT = dict + msg.info('YAML output is unordered') +else: + def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict): + class OrderedLoader(Loader): + pass + def construct_mapping(loader, node): + loader.flatten_mapping(node) + return object_pairs_hook(loader.construct_pairs(node)) + OrderedLoader.add_constructor( + yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, + construct_mapping) + return yaml.load(stream, OrderedLoader) class PythonCharts(object): """ Main class used to control every python module. """ + def __init__(self, - update_every=None, # TODO remove this modules=None, modules_path='../python.d/', modules_configs='../conf.d/', modules_disabled=None): """ - :param update_every: int :param modules: list :param modules_path: str :param modules_configs: str @@ -145,9 +122,11 @@ class PythonCharts(object): # good economy and prosperity: self.jobs = self._create_jobs(configured_modules) # type: list -# if DEBUG_FLAG and update_every is not None: -# for job in self.jobs: -# job.create_timetable(update_every) + + # enable timetable override like `python.d.plugin mysql debug 1` + if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY: + for job in self.jobs: + job.create_timetable(BASE_CONFIG['update_every']) @staticmethod def _import_module(path, name=None): @@ -169,7 +148,7 @@ class PythonCharts(object): else: return imp.load_source(name, path) except Exception as e: - debug(str(e)) + msg.error("Problem loading", name, str(e)) return None def _load_modules(self, path, modules, disabled): @@ -183,7 +162,7 @@ class PythonCharts(object): # check if plugin directory exists if not os.path.isdir(path): - fatal("cannot find charts directory ", path) + msg.fatal("cannot find charts directory ", path) # load modules loaded = [] @@ -195,17 +174,17 @@ class PythonCharts(object): if mod is not None: loaded.append(mod) else: # exit if plugin is not found - fatal('no modules found.') + msg.fatal('no modules found.') else: # scan directory specified in path and load all modules from there names = os.listdir(path) for mod in names: - if mod.strip(MODULE_EXTENSION) in disabled: - error(mod + ": disabled module ", mod.strip(MODULE_EXTENSION)) + if mod.replace(MODULE_EXTENSION, "") in disabled: + msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, "")) continue m = self._import_module(path + mod) if m is not None: - debug(mod + ": loading module '" + path + mod + "'") + msg.debug(mod + ": loading module '" + path + mod + "'") loaded.append(m) return loaded @@ -220,17 +199,20 @@ class PythonCharts(object): for mod in modules: configfile = self.configs + mod.__name__ + ".conf" if os.path.isfile(configfile): - debug(mod.__name__ + ": loading module configuration: '" + configfile + "'") + msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'") try: + if not hasattr(mod, 'config'): + mod.config = {} setattr(mod, 'config', self._parse_config(mod, read_config(configfile))) except Exception as e: - error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e)) + msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e)) else: - error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.") + msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.") # set config if not found if not hasattr(mod, 'config'): + msg.debug(mod.__name__ + ": setting configuration for only one job") mod.config = {None: {}} for var in BASE_CONFIG: try: @@ -253,8 +235,11 @@ class PythonCharts(object): :param config: dict :return: dict """ + if config is None: + config = {} # get default values defaults = {} + msg.debug(module.__name__ + ": reading configuration") for key in BASE_CONFIG: try: # get defaults from module config @@ -263,14 +248,14 @@ class PythonCharts(object): try: # get defaults from module source code defaults[key] = getattr(module, key) - except (KeyError, ValueError): + except (KeyError, ValueError, AttributeError): # if above failed, get defaults from global dict defaults[key] = BASE_CONFIG[key] # check if there are dict in config dict many_jobs = False for name in config: - if type(config[name]) is dict: + if isinstance(config[name], DICT): many_jobs = True break @@ -303,10 +288,10 @@ class PythonCharts(object): try: job = module.Service(configuration=conf, name=name) except Exception as e: - error(module.__name__ + - ("/" + str(name) if name is not None else "") + - ": cannot start job: '" + - str(e)) + msg.error(module.__name__ + + ("/" + str(name) if name is not None else "") + + ": cannot start job: '" + + str(e)) return None else: # set chart_name (needed to plot run time graphs) @@ -314,7 +299,7 @@ class PythonCharts(object): if name is not None: job.chart_name += "_" + name jobs.append(job) - debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added") + msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added") return [j for j in jobs if j is not None] @@ -326,30 +311,35 @@ class PythonCharts(object): :param reason: str """ prefix = job.__module__ - if job.name is not None: + if job.name is not None and len(job.name) != 0: prefix += "/" + job.name - prefix += ": " + try: + msg.error("DISABLED:", prefix) + self.jobs.remove(job) + except Exception as e: + msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e)) - self.jobs.remove(job) + # TODO remove section below and remove `reason`. + prefix += ": " if reason is None: return elif reason[:3] == "no ": - error(prefix + - "does not seem to have " + - reason[3:] + - "() function. Disabling it.") + msg.error(prefix + + "does not seem to have " + + reason[3:] + + "() function. Disabling it.") elif reason[:7] == "failed ": - error(prefix + - reason[7:] + - "() function reports failure.") + msg.error(prefix + + reason[7:] + + "() function reports failure.") elif reason[:13] == "configuration": - error(prefix + - "configuration file '" + - self.configs + - job.__module__ + - ".conf' not found. Using defaults.") + msg.error(prefix + + "configuration file '" + + self.configs + + job.__module__ + + ".conf' not found. Using defaults.") elif reason[:11] == "misbehaving": - error(prefix + "is " + reason) + msg.error(prefix + "is " + reason) def check(self): """ @@ -358,17 +348,40 @@ class PythonCharts(object): If job.check() fails job is stopped """ i = 0 + overridden = [] + msg.debug("all job objects", str(self.jobs)) while i < len(self.jobs): job = self.jobs[i] try: if not job.check(): - self._stop(job, "failed check") + msg.error(job.chart_name, "check() failed - disabling job") + self._stop(job) else: + msg.info("CHECKED OK:", job.chart_name) i += 1 - except AttributeError: - self._stop(job, "no check") + try: + if job.override_name is not None: + new_name = job.__module__ + '_' + sub(r'\s+', '_', job.override_name) + if new_name in overridden: + msg.info("DROPPED:", job.name, ", job '" + job.override_name + "' is already served by another job.") + self._stop(job) + i -= 1 + else: + job.name = job.override_name + msg.info("RENAMED:", new_name, ", from " + job.chart_name) + job.chart_name = new_name + overridden.append(job.chart_name) + except Exception: + pass + except AttributeError as e: + self._stop(job) + msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.") + msg.debug(str(e)) except (UnboundLocalError, Exception) as e: - self._stop(job, "misbehaving. Reason: " + str(e)) + msg.error(job.chart_name, str(e)) + self._stop(job) + msg.debug("overridden job names:", str(overridden)) + msg.debug("all remaining job objects:", str(self.jobs)) def create(self): """ @@ -382,7 +395,8 @@ class PythonCharts(object): job = self.jobs[i] try: if not job.create(): - self._stop(job, "failed create") + msg.error(job.chart_name, "create function failed.") + self._stop(job) else: chart = job.chart_name sys.stdout.write( @@ -394,79 +408,28 @@ class PythonCharts(object): str(job.timetable['freq']) + '\n') sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n") + msg.debug("created charts for", job.chart_name) # sys.stdout.flush() i += 1 except AttributeError: - self._stop(job, "no create") + msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.") + self._stop(job) except (UnboundLocalError, Exception) as e: - self._stop(job, "misbehaving. Reason: " + str(e)) - - def _update_job(self, job): - """ - Tries to execute update() on specified job. - This cannot fail thus it is catching every exception. - If job.update() returns False, number of retries_left is decremented. - If there are no more retries, job is stopped. - Job is also stopped if it throws an exception. - This is also updating job run time chart. - Return False if job is stopped - :param job: object - :return: boolean - """ - t_start = time.time() - # check if it is time to execute job update() function - if job.timetable['next'] > t_start: - return True - try: - if self.first_run: - since_last = 0 - else: - since_last = int((t_start - job.timetable['last']) * 1000000) - if not job.update(since_last): - if job.retries_left <= 0: - self._stop(job, "update failed") - return False - job.retries_left -= 1 - job.timetable['next'] += job.timetable['freq'] - return True - except AttributeError: - self._stop(job, "no update") - return False - except (UnboundLocalError, Exception) as e: - self._stop(job, "misbehaving. Reason: " + str(e)) - return False - t_end = time.time() - job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq'] - # draw performance graph - sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.chart_name + " " + str(since_last) + '\n') - sys.stdout.write("SET run_time = " + str(int((t_end - t_start) * 1000)) + '\n') - sys.stdout.write("END\n") - # sys.stdout.flush() - job.timetable['last'] = t_start - job.retries_left = job.retries - self.first_run = False - return True + msg.error(job.chart_name, str(e)) + self._stop(job) def update(self): """ - Tries to execute update() on every job by using _update_job() + Creates and supervises every job thread. This will stay forever and ever and ever forever and ever it'll be the one... """ - self.first_run = True + for job in self.jobs: + job.start() + while True: - next_runs = [] - i = 0 - while i < len(self.jobs): - job = self.jobs[i] - if self._update_job(job): - try: - next_runs.append(job.timetable['next']) - except KeyError: - pass - i += 1 - if len(next_runs) == 0: - fatal('no python.d modules loaded.') - time.sleep(min(next_runs) - time.time()) + if threading.active_count() <= 1: + msg.fatal("no more jobs") + time.sleep(1) def read_config(path): @@ -477,12 +440,15 @@ def read_config(path): """ try: with open(path, 'r') as stream: - config = yaml.load(stream) + if ORDERED: + config = ordered_load(stream, yaml.SafeLoader) + else: + config = yaml.load(stream) except (OSError, IOError): - error(str(path), "is not a valid configuration file") + msg.error(str(path), "is not a valid configuration file") return None except yaml.YAMLError as e: - error(str(path), "is malformed:", e) + msg.error(str(path), "is malformed:", e) return None return config @@ -494,10 +460,11 @@ def parse_cmdline(directory, *commands): :param commands: list of str :return: dict """ - global DEBUG_FLAG - global UPDATE_EVERY - update_every = UPDATE_EVERY + global DEBUG_FLAG, TRACE_FLAG + global OVERRIDE_UPDATE_EVERY + global BASE_CONFIG + changed_update = False mods = [] for cmd in commands[1:]: if cmd == "check": @@ -505,18 +472,23 @@ def parse_cmdline(directory, *commands): elif cmd == "debug" or cmd == "all": DEBUG_FLAG = True # redirect stderr to stdout? + elif cmd == "trace" or cmd == "all": + TRACE_FLAG = True elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd): - DEBUG_FLAG = True + #DEBUG_FLAG = True mods.append(cmd.replace(".chart.py", "")) else: try: - update_every = int(cmd) + BASE_CONFIG['update_every'] = int(cmd) + changed_update = True except ValueError: - update_every = UPDATE_EVERY + pass + if changed_update and DEBUG_FLAG: + OVERRIDE_UPDATE_EVERY = True + msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every'])) - debug("started from", commands[0], "with options:", *commands[1:]) + msg.debug("started from", commands[0], "with options:", *commands[1:]) - UPDATE_EVERY = update_every return mods @@ -525,49 +497,76 @@ def run(): """ Main program. """ - global PROGRAM, DEBUG_FLAG - PROGRAM = sys.argv[0].split('/')[-1].split('.plugin')[0] + global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG # read configuration file disabled = [] configfile = CONFIG_DIR + "python.d.conf" + msg.PROGRAM = PROGRAM + msg.info("reading configuration file:", configfile) + log_throttle = 200 + log_interval = 3600 - update_every = UPDATE_EVERY conf = read_config(configfile) if conf is not None: try: - # exit the whole plugin when 'enable: no' is set in 'python.d.conf' - if str(conf['enable']) is False: - fatal('disabled in configuration file.\n') + # exit the whole plugin when 'enabled: no' is set in 'python.d.conf' + if conf['enabled'] is False: + msg.fatal('disabled in configuration file.\n') except (KeyError, TypeError): pass + try: - update_every = conf['update_every'] + for param in BASE_CONFIG: + BASE_CONFIG[param] = conf[param] except (KeyError, TypeError): pass # use default update_every from NETDATA_UPDATE_EVERY + try: DEBUG_FLAG = conf['debug'] except (KeyError, TypeError): pass + + try: + TRACE_FLAG = conf['trace'] + except (KeyError, TypeError): + pass + + try: + log_throttle = conf['logs_per_interval'] + except (KeyError, TypeError): + pass + + try: + log_interval = conf['log_interval'] + except (KeyError, TypeError): + pass + for k, v in conf.items(): - if k in ("update_every", "debug", "enable"): + if k in ("update_every", "debug", "enabled"): continue if v is False: disabled.append(k) # parse passed command line arguments modules = parse_cmdline(MODULES_DIR, *sys.argv) - info("MODULES_DIR='" + MODULES_DIR + - "', CONFIG_DIR='" + CONFIG_DIR + - "', UPDATE_EVERY=" + str(UPDATE_EVERY) + - ", ONLY_MODULES=" + str(modules)) + msg.DEBUG_FLAG = DEBUG_FLAG + msg.TRACE_FLAG = TRACE_FLAG + msg.LOG_THROTTLE = log_throttle + msg.LOG_INTERVAL = log_interval + msg.LOG_COUNTER = 0 + msg.LOG_NEXT_CHECK = 0 + msg.info("MODULES_DIR='" + MODULES_DIR + + "', CONFIG_DIR='" + CONFIG_DIR + + "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) + + ", ONLY_MODULES=" + str(modules)) # run plugins - charts = PythonCharts(update_every, modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled) + charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled) charts.check() charts.create() charts.update() - fatal("finished") + msg.fatal("finished") if __name__ == '__main__':