X-Git-Url: https://arthur.barton.de/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=plugins.d%2Fpython.d.plugin;h=efa62cbc5a34affbd86651147a50e08871ff7e0f;hb=HEAD;hp=6d7f36b3169cdad2be84ff5d73ffbbd621651974;hpb=661f5455e8e2d8571ad4612ee7d2b757ee461eaa;p=netdata.git diff --git a/plugins.d/python.d.plugin b/plugins.d/python.d.plugin index 6d7f36b3..efa62cbc 100755 --- a/plugins.d/python.d.plugin +++ b/plugins.d/python.d.plugin @@ -1,4 +1,5 @@ -#!/usr/bin/env python +#!/usr/bin/env bash +'''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' # -*- coding: utf-8 -*- # Description: netdata python modules supervisor @@ -7,6 +8,8 @@ import os import sys import time +import threading +from re import sub # ----------------------------------------------------------------------------- # globals & environment setup @@ -17,7 +20,7 @@ BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1), 'retries': 10} MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR', - os.path.dirname(__file__)) + "/../python.d") + "/" + os.path.dirname(__file__)) + "/../python.d") + "/" CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/") # directories should end with '/' if CONFIG_DIR[-1] != "/": @@ -26,71 +29,21 @@ sys.path.append(MODULES_DIR + "python_modules") PROGRAM = os.path.basename(__file__).replace(".plugin", "") DEBUG_FLAG = False +TRACE_FLAG = False OVERRIDE_UPDATE_EVERY = False - # ----------------------------------------------------------------------------- -# logging functions -def log_msg(msg_type, *args): - """ - Print message on stderr. - :param msg_type: str - """ - sys.stderr.write(PROGRAM) - sys.stderr.write(" ") - sys.stderr.write(msg_type) - sys.stderr.write(": ") - for i in args: - sys.stderr.write(" ") - sys.stderr.write(str(i)) - sys.stderr.write("\n") - sys.stderr.flush() - - -def debug(*args): - """ - Print debug message on stderr. - """ - if not DEBUG_FLAG: - return - - log_msg("DEBUG", *args) - - -def error(*args): - """ - Print message on stderr. - """ - log_msg("ERROR", *args) - - -def info(*args): - """ - Print message on stderr. - """ - log_msg("INFO", *args) - - -def fatal(*args): - """ - Print message on stderr and exit. - """ - log_msg("FATAL", *args) - sys.stdout.write('DISABLE\n') - sys.exit(1) +# custom, third party and version specific python modules management +import msg - -# ----------------------------------------------------------------------------- -# third party and version specific python modules management try: assert sys.version_info >= (3, 1) import importlib.machinery - + PY_VERSION = 3 # change this hack below if we want PY_VERSION to be used in modules # import builtins # builtins.PY_VERSION = 3 - PY_VERSION = 3 - info('Using python v3') + msg.info('Using python v3') except (AssertionError, ImportError): try: import imp @@ -99,29 +52,69 @@ except (AssertionError, ImportError): # import __builtin__ # __builtin__.PY_VERSION = 2 PY_VERSION = 2 - info('Using python v2') + msg.info('Using python v2') except ImportError: - fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2') + msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2') +# try: +# import yaml +# except ImportError: +# msg.fatal('Cannot find yaml library') try: - import yaml + if PY_VERSION == 3: + import pyyaml3 as yaml + else: + import pyyaml2 as yaml except ImportError: - fatal('Cannot find yaml library') + msg.fatal('Cannot find yaml library') + +try: + from collections import OrderedDict + ORDERED = True + DICT = OrderedDict + msg.info('YAML output is ordered') +except ImportError: + try: + from ordereddict import OrderedDict + ORDERED = True + DICT = OrderedDict + msg.info('YAML output is ordered') + except ImportError: + ORDERED = False + DICT = dict + msg.info('YAML output is unordered') +if ORDERED: + def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict): + class OrderedLoader(Loader): + pass + + def construct_mapping(loader, node): + loader.flatten_mapping(node) + return object_pairs_hook(loader.construct_pairs(node)) + OrderedLoader.add_constructor( + yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, + construct_mapping) + return yaml.load(stream, OrderedLoader) class PythonCharts(object): """ Main class used to control every python module. """ + def __init__(self, modules=None, modules_path='../python.d/', modules_configs='../conf.d/', - modules_disabled=None): + modules_disabled=None, + modules_enabled=None, + default_run=None): """ :param modules: list :param modules_path: str :param modules_configs: str :param modules_disabled: list + :param modules_enabled: list + :param default_run: bool """ if modules is None: @@ -134,13 +127,13 @@ class PythonCharts(object): self.configs = modules_configs # load modules - loaded_modules = self._load_modules(modules_path, modules, modules_disabled) + loaded_modules = self._load_modules(modules_path, modules, modules_disabled, modules_enabled, default_run) # load configuration files configured_modules = self._load_configs(loaded_modules) # good economy and prosperity: - self.jobs = self._create_jobs(configured_modules) # type: list + self.jobs = self._create_jobs(configured_modules) # type # enable timetable override like `python.d.plugin mysql debug 1` if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY: @@ -167,10 +160,10 @@ class PythonCharts(object): else: return imp.load_source(name, path) except Exception as e: - debug(str(e)) + msg.error("Problem loading", name, str(e)) return None - def _load_modules(self, path, modules, disabled): + def _load_modules(self, path, modules, disabled, enabled, default_run): """ Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files) :param path: str @@ -181,7 +174,7 @@ class PythonCharts(object): # check if plugin directory exists if not os.path.isdir(path): - fatal("cannot find charts directory ", path) + msg.fatal("cannot find charts directory ", path) # load modules loaded = [] @@ -193,17 +186,20 @@ class PythonCharts(object): if mod is not None: loaded.append(mod) else: # exit if plugin is not found - fatal('no modules found.') + msg.fatal('no modules found.') else: # scan directory specified in path and load all modules from there - names = os.listdir(path) + if default_run is False: + names = [module for module in os.listdir(path) if module[:-9] in enabled] + else: + names = os.listdir(path) for mod in names: - if mod.strip(MODULE_EXTENSION) in disabled: - error(mod + ": disabled module ", mod.strip(MODULE_EXTENSION)) + if mod.replace(MODULE_EXTENSION, "") in disabled: + msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, "")) continue m = self._import_module(path + mod) if m is not None: - debug(mod + ": loading module '" + path + mod + "'") + msg.debug(mod + ": loading module '" + path + mod + "'") loaded.append(m) return loaded @@ -218,18 +214,20 @@ class PythonCharts(object): for mod in modules: configfile = self.configs + mod.__name__ + ".conf" if os.path.isfile(configfile): - debug(mod.__name__ + ": loading module configuration: '" + configfile + "'") + msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'") try: + if not hasattr(mod, 'config'): + mod.config = {} setattr(mod, 'config', self._parse_config(mod, read_config(configfile))) except Exception as e: - error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e)) + msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e)) else: - error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.") + msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.") # set config if not found if not hasattr(mod, 'config'): - debug(mod.__name__ + ": setting configuration for only one job") + msg.debug(mod.__name__ + ": setting configuration for only one job") mod.config = {None: {}} for var in BASE_CONFIG: try: @@ -252,10 +250,12 @@ class PythonCharts(object): :param config: dict :return: dict """ + if config is None: + config = {} # get default values defaults = {} + msg.debug(module.__name__ + ": reading configuration") for key in BASE_CONFIG: - debug(module.__name__ + ": reading configuration") try: # get defaults from module config defaults[key] = int(config.pop(key)) @@ -263,14 +263,14 @@ class PythonCharts(object): try: # get defaults from module source code defaults[key] = getattr(module, key) - except (KeyError, ValueError): + except (KeyError, ValueError, AttributeError): # if above failed, get defaults from global dict defaults[key] = BASE_CONFIG[key] # check if there are dict in config dict many_jobs = False for name in config: - if type(config[name]) is dict: + if isinstance(config[name], DICT): many_jobs = True break @@ -303,10 +303,10 @@ class PythonCharts(object): try: job = module.Service(configuration=conf, name=name) except Exception as e: - error(module.__name__ + - ("/" + str(name) if name is not None else "") + - ": cannot start job: '" + - str(e)) + msg.error(module.__name__ + + ("/" + str(name) if name is not None else "") + + ": cannot start job: '" + + str(e)) return None else: # set chart_name (needed to plot run time graphs) @@ -314,7 +314,7 @@ class PythonCharts(object): if name is not None: job.chart_name += "_" + name jobs.append(job) - debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added") + msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added") return [j for j in jobs if j is not None] @@ -326,30 +326,35 @@ class PythonCharts(object): :param reason: str """ prefix = job.__module__ - if job.name is not None: + if job.name is not None and len(job.name) != 0: prefix += "/" + job.name - prefix += ": " + try: + msg.error("DISABLED:", prefix) + self.jobs.remove(job) + except Exception as e: + msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e)) - self.jobs.remove(job) + # TODO remove section below and remove `reason`. + prefix += ": " if reason is None: return elif reason[:3] == "no ": - error(prefix + - "does not seem to have " + - reason[3:] + - "() function. Disabling it.") + msg.error(prefix + + "does not seem to have " + + reason[3:] + + "() function. Disabling it.") elif reason[:7] == "failed ": - error(prefix + - reason[7:] + - "() function reports failure.") + msg.error(prefix + + reason[7:] + + "() function reports failure.") elif reason[:13] == "configuration": - error(prefix + - "configuration file '" + - self.configs + - job.__module__ + - ".conf' not found. Using defaults.") + msg.error(prefix + + "configuration file '" + + self.configs + + job.__module__ + + ".conf' not found. Using defaults.") elif reason[:11] == "misbehaving": - error(prefix + "is " + reason) + msg.error(prefix + "is " + reason) def check(self): """ @@ -358,17 +363,41 @@ class PythonCharts(object): If job.check() fails job is stopped """ i = 0 + overridden = [] + msg.debug("all job objects", str(self.jobs)) while i < len(self.jobs): job = self.jobs[i] try: if not job.check(): - self._stop(job, "failed check") + msg.error(job.chart_name, "check() failed - disabling job") + self._stop(job) else: + msg.info("CHECKED OK:", job.chart_name) i += 1 - except AttributeError: - self._stop(job, "no check") + try: + if job.override_name is not None: + new_name = job.__module__ + '_' + sub(r'\s+', '_', job.override_name) + if new_name in overridden: + msg.info("DROPPED:", job.name, ", job '" + job.override_name + + "' is already served by another job.") + self._stop(job) + i -= 1 + else: + job.name = job.override_name + msg.info("RENAMED:", new_name, ", from " + job.chart_name) + job.chart_name = new_name + overridden.append(job.chart_name) + except Exception: + pass + except AttributeError as e: + self._stop(job) + msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.") + msg.debug(str(e)) except (UnboundLocalError, Exception) as e: - self._stop(job, "misbehaving. Reason: " + str(e)) + msg.error(job.chart_name, str(e)) + self._stop(job) + msg.debug("overridden job names:", str(overridden)) + msg.debug("all remaining job objects:", str(self.jobs)) def create(self): """ @@ -382,9 +411,9 @@ class PythonCharts(object): job = self.jobs[i] try: if not job.create(): - self._stop(job, "failed create") + msg.error(job.chart_name, "create function failed.") + self._stop(job) else: - debug("created charts for", job.chart_name) chart = job.chart_name sys.stdout.write( "CHART netdata.plugin_pythond_" + @@ -395,81 +424,28 @@ class PythonCharts(object): str(job.timetable['freq']) + '\n') sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n") + msg.debug("created charts for", job.chart_name) # sys.stdout.flush() i += 1 except AttributeError: - self._stop(job, "no create") + msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.") + self._stop(job) except (UnboundLocalError, Exception) as e: - self._stop(job, "misbehaving. Reason: " + str(e)) - - def _update_job(self, job): - """ - Tries to execute update() on specified job. - This cannot fail thus it is catching every exception. - If job.update() returns False, number of retries_left is decremented. - If there are no more retries, job is stopped. - Job is also stopped if it throws an exception. - This is also updating job run time chart. - Return False if job is stopped - :param job: object - :return: boolean - """ - t_start = time.time() - # check if it is time to execute job update() function - if job.timetable['next'] > t_start: - return True - try: - if self.first_run: - since_last = 0 - else: - since_last = int((t_start - job.timetable['last']) * 1000000) - if not job.update(since_last): - if job.retries_left <= 0: - self._stop(job, "update failed") - return False - job.retries_left -= 1 - job.timetable['next'] += job.timetable['freq'] - return True - except AttributeError: - self._stop(job, "no update") - return False - except (UnboundLocalError, Exception) as e: - self._stop(job, "misbehaving. Reason: " + str(e)) - return False - t_end = time.time() - job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq'] - # draw performance graph - run_time = str(int((t_end - t_start) * 1000)) - debug(job.chart_name, "updated in", run_time) - sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.chart_name + " " + str(since_last) + '\n') - sys.stdout.write("SET run_time = " + run_time + '\n') - sys.stdout.write("END\n") - # sys.stdout.flush() - job.timetable['last'] = t_start - job.retries_left = job.retries - self.first_run = False - return True + msg.error(job.chart_name, str(e)) + self._stop(job) def update(self): """ - Tries to execute update() on every job by using _update_job() + Creates and supervises every job thread. This will stay forever and ever and ever forever and ever it'll be the one... """ - self.first_run = True + for job in self.jobs: + job.start() + while True: - next_runs = [] - i = 0 - while i < len(self.jobs): - job = self.jobs[i] - if self._update_job(job): - try: - next_runs.append(job.timetable['next']) - except KeyError: - pass - i += 1 - if len(next_runs) == 0: - fatal('no python.d modules loaded.') - time.sleep(min(next_runs) - time.time()) + if threading.active_count() <= 1: + msg.fatal("no more jobs") + time.sleep(1) def read_config(path): @@ -480,12 +456,15 @@ def read_config(path): """ try: with open(path, 'r') as stream: - config = yaml.load(stream) + if ORDERED: + config = ordered_load(stream, yaml.SafeLoader) + else: + config = yaml.load(stream) except (OSError, IOError): - error(str(path), "is not a valid configuration file") + msg.error(str(path), "is not a valid configuration file") return None except yaml.YAMLError as e: - error(str(path), "is malformed:", e) + msg.error(str(path), "is malformed:", e) return None return config @@ -497,10 +476,11 @@ def parse_cmdline(directory, *commands): :param commands: list of str :return: dict """ - global DEBUG_FLAG + global DEBUG_FLAG, TRACE_FLAG global OVERRIDE_UPDATE_EVERY global BASE_CONFIG + changed_update = False mods = [] for cmd in commands[1:]: if cmd == "check": @@ -508,18 +488,22 @@ def parse_cmdline(directory, *commands): elif cmd == "debug" or cmd == "all": DEBUG_FLAG = True # redirect stderr to stdout? + elif cmd == "trace" or cmd == "all": + TRACE_FLAG = True elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd): - DEBUG_FLAG = True + # DEBUG_FLAG = True mods.append(cmd.replace(".chart.py", "")) else: try: BASE_CONFIG['update_every'] = int(cmd) - OVERRIDE_UPDATE_EVERY = True - debug(PROGRAM, "overriding update interval to", str(int(cmd))) + changed_update = True except ValueError: pass + if changed_update and DEBUG_FLAG: + OVERRIDE_UPDATE_EVERY = True + msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every'])) - debug("started from", commands[0], "with options:", *commands[1:]) + msg.debug("started from", commands[0], "with options:", *commands[1:]) return mods @@ -529,50 +513,83 @@ def run(): """ Main program. """ - global DEBUG_FLAG, BASE_CONFIG + global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG # read configuration file - disabled = [] + disabled = ['nginx_log', 'gunicorn_log'] + enabled = list() + default_run = True configfile = CONFIG_DIR + "python.d.conf" - debug(PROGRAM, "reading configuration file:", configfile) + msg.PROGRAM = PROGRAM + msg.info("reading configuration file:", configfile) + log_throttle = 200 + log_interval = 3600 conf = read_config(configfile) if conf is not None: try: # exit the whole plugin when 'enabled: no' is set in 'python.d.conf' if conf['enabled'] is False: - fatal('disabled in configuration file.\n') + msg.fatal('disabled in configuration file.\n') except (KeyError, TypeError): pass + try: for param in BASE_CONFIG: BASE_CONFIG[param] = conf[param] except (KeyError, TypeError): pass # use default update_every from NETDATA_UPDATE_EVERY + try: DEBUG_FLAG = conf['debug'] except (KeyError, TypeError): pass + + try: + TRACE_FLAG = conf['trace'] + except (KeyError, TypeError): + pass + + try: + log_throttle = conf['logs_per_interval'] + except (KeyError, TypeError): + pass + + try: + log_interval = conf['log_interval'] + except (KeyError, TypeError): + pass + + default_run = True if ('default_run' not in conf or conf.get('default_run')) else False + for k, v in conf.items(): - if k in ("update_every", "debug", "enabled"): + if k in ("update_every", "debug", "enabled", "default_run"): continue - if v is False: - disabled.append(k) - + if default_run: + if v is False: + disabled.append(k) + else: + if v is True: + enabled.append(k) # parse passed command line arguments modules = parse_cmdline(MODULES_DIR, *sys.argv) - info("MODULES_DIR='" + MODULES_DIR + - "', CONFIG_DIR='" + CONFIG_DIR + - "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) + - ", ONLY_MODULES=" + str(modules)) + msg.DEBUG_FLAG = DEBUG_FLAG + msg.TRACE_FLAG = TRACE_FLAG + msg.LOG_THROTTLE = log_throttle + msg.LOG_INTERVAL = log_interval + msg.LOG_COUNTER = 0 + msg.LOG_NEXT_CHECK = 0 + msg.info("MODULES_DIR='" + MODULES_DIR + + "', CONFIG_DIR='" + CONFIG_DIR + + "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) + + ", ONLY_MODULES=" + str(modules)) # run plugins - modules = ['nginx'] - charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled) + charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled, enabled, default_run) charts.check() charts.create() charts.update() - fatal("finished") + msg.fatal("finished") if __name__ == '__main__':