#!/usr/bin/env python # -*- coding: utf-8 -*- # Description: netdata python modules supervisor # Author: Pawel Krupa (paulfantom) import os import sys import time # ----------------------------------------------------------------------------- # globals & environment setup # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables MODULE_EXTENSION = ".chart.py" BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1), 'priority': 90000, 'retries': 10} MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR', os.path.dirname(__file__)) + "/../python.d") + "/" CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/") # directories should end with '/' if CONFIG_DIR[-1] != "/": CONFIG_DIR += "/" sys.path.append(MODULES_DIR + "python_modules") PROGRAM = os.path.basename(__file__).replace(".plugin", "") DEBUG_FLAG = False OVERRIDE_UPDATE_EVERY = False # ----------------------------------------------------------------------------- # logging functions def log_msg(msg_type, *args): """ Print message on stderr. :param msg_type: str """ sys.stderr.write(PROGRAM) sys.stderr.write(" ") sys.stderr.write(msg_type) sys.stderr.write(": ") for i in args: sys.stderr.write(" ") sys.stderr.write(str(i)) sys.stderr.write("\n") sys.stderr.flush() def debug(*args): """ Print debug message on stderr. """ if not DEBUG_FLAG: return log_msg("DEBUG", *args) def error(*args): """ Print message on stderr. """ log_msg("ERROR", *args) def info(*args): """ Print message on stderr. """ log_msg("INFO", *args) def fatal(*args): """ Print message on stderr and exit. """ log_msg("FATAL", *args) sys.stdout.write('DISABLE\n') sys.exit(1) # ----------------------------------------------------------------------------- # third party and version specific python modules management try: assert sys.version_info >= (3, 1) import importlib.machinery # change this hack below if we want PY_VERSION to be used in modules # import builtins # builtins.PY_VERSION = 3 PY_VERSION = 3 info('Using python v3') except (AssertionError, ImportError): try: import imp # change this hack below if we want PY_VERSION to be used in modules # import __builtin__ # __builtin__.PY_VERSION = 2 PY_VERSION = 2 info('Using python v2') except ImportError: fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2') try: import yaml except ImportError: fatal('Cannot find yaml library') class PythonCharts(object): """ Main class used to control every python module. """ def __init__(self, modules=None, modules_path='../python.d/', modules_configs='../conf.d/', modules_disabled=None): """ :param modules: list :param modules_path: str :param modules_configs: str :param modules_disabled: list """ if modules is None: modules = [] if modules_disabled is None: modules_disabled = [] self.first_run = True # set configuration directory self.configs = modules_configs # load modules loaded_modules = self._load_modules(modules_path, modules, modules_disabled) # load configuration files configured_modules = self._load_configs(loaded_modules) # good economy and prosperity: self.jobs = self._create_jobs(configured_modules) # type: list # enable timetable override like `python.d.plugin mysql debug 1` if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY: for job in self.jobs: job.create_timetable(BASE_CONFIG['update_every']) @staticmethod def _import_module(path, name=None): """ Try to import module using only its path. :param path: str :param name: str :return: object """ if name is None: name = path.split('/')[-1] if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION: return None name = name[:-len(MODULE_EXTENSION)] try: if PY_VERSION == 3: return importlib.machinery.SourceFileLoader(name, path).load_module() else: return imp.load_source(name, path) except Exception as e: error("Problem loading", name, str(e)) return None def _load_modules(self, path, modules, disabled): """ Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files) :param path: str :param modules: list :param disabled: list :return: list """ # check if plugin directory exists if not os.path.isdir(path): fatal("cannot find charts directory ", path) # load modules loaded = [] if len(modules) > 0: for m in modules: if m in disabled: continue mod = self._import_module(path + m + MODULE_EXTENSION) if mod is not None: loaded.append(mod) else: # exit if plugin is not found fatal('no modules found.') else: # scan directory specified in path and load all modules from there names = os.listdir(path) for mod in names: if mod.replace(MODULE_EXTENSION,"") in disabled: error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION,"")) continue m = self._import_module(path + mod) if m is not None: debug(mod + ": loading module '" + path + mod + "'") loaded.append(m) return loaded def _load_configs(self, modules): """ Append configuration in list named `config` to every module. For multi-job modules `config` list is created in _parse_config, otherwise it is created here based on BASE_CONFIG prototype with None as identifier. :param modules: list :return: list """ for mod in modules: configfile = self.configs + mod.__name__ + ".conf" if os.path.isfile(configfile): debug(mod.__name__ + ": loading module configuration: '" + configfile + "'") try: if not hasattr(mod, 'config'): mod.config = {} setattr(mod, 'config', self._parse_config(mod, read_config(configfile))) except Exception as e: error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e)) else: error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.") # set config if not found if not hasattr(mod, 'config'): debug(mod.__name__ + ": setting configuration for only one job") mod.config = {None: {}} for var in BASE_CONFIG: try: mod.config[None][var] = getattr(mod, var) except AttributeError: mod.config[None][var] = BASE_CONFIG[var] return modules @staticmethod def _parse_config(module, config): """ Parse configuration file or extract configuration from module file. Example of returned dictionary: config = {'name': { 'update_every': 2, 'retries': 3, 'priority': 30000 'other_val': 123}} :param module: object :param config: dict :return: dict """ # get default values defaults = {} debug(module.__name__ + ": reading configuration") for key in BASE_CONFIG: try: # get defaults from module config defaults[key] = int(config.pop(key)) except (KeyError, ValueError): try: # get defaults from module source code defaults[key] = getattr(module, key) except (KeyError, ValueError, AttributeError): # if above failed, get defaults from global dict defaults[key] = BASE_CONFIG[key] # check if there are dict in config dict many_jobs = False for name in config: if type(config[name]) is dict: many_jobs = True break # assign variables needed by supervisor to every job configuration if many_jobs: for name in config: for key in defaults: if key not in config[name]: config[name][key] = defaults[key] # if only one job is needed, values doesn't have to be in dict (in YAML) else: config = {None: config.copy()} config[None].update(defaults) # return dictionary of jobs where every job has BASE_CONFIG variables return config @staticmethod def _create_jobs(modules): """ Create jobs based on module.config dictionary and module.Service class definition. :param modules: list :return: list """ jobs = [] for module in modules: for name in module.config: # register a new job conf = module.config[name] try: job = module.Service(configuration=conf, name=name) except Exception as e: error(module.__name__ + ("/" + str(name) if name is not None else "") + ": cannot start job: '" + str(e)) return None else: # set chart_name (needed to plot run time graphs) job.chart_name = module.__name__ if name is not None: job.chart_name += "_" + name jobs.append(job) debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added") return [j for j in jobs if j is not None] def _stop(self, job, reason=None): """ Stop specified job and remove it from self.jobs list Also notifies user about job failure if DEBUG_FLAG is set :param job: object :param reason: str """ prefix = job.__module__ if job.name is not None: prefix += "/" + job.name prefix += ": " self.jobs.remove(job) if reason is None: return elif reason[:3] == "no ": error(prefix + "does not seem to have " + reason[3:] + "() function. Disabling it.") elif reason[:7] == "failed ": error(prefix + reason[7:] + "() function reports failure.") elif reason[:13] == "configuration": error(prefix + "configuration file '" + self.configs + job.__module__ + ".conf' not found. Using defaults.") elif reason[:11] == "misbehaving": error(prefix + "is " + reason) def check(self): """ Tries to execute check() on every job. This cannot fail thus it is catching every exception If job.check() fails job is stopped """ i = 0 while i < len(self.jobs): job = self.jobs[i] try: if not job.check(): self._stop(job, "failed check") else: debug(job.chart_name, ": check succeeded") i += 1 except AttributeError: self._stop(job, "no check") except (UnboundLocalError, Exception) as e: self._stop(job, "misbehaving. Reason:" + str(e)) def create(self): """ Tries to execute create() on every job. This cannot fail thus it is catching every exception. If job.create() fails job is stopped. This is also creating job run time chart. """ i = 0 while i < len(self.jobs): job = self.jobs[i] try: if not job.create(): self._stop(job, "failed create") else: chart = job.chart_name sys.stdout.write( "CHART netdata.plugin_pythond_" + chart + " '' 'Execution time for " + chart + " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " + str(job.timetable['freq']) + '\n') sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n") debug("created charts for", job.chart_name) # sys.stdout.flush() i += 1 except AttributeError: self._stop(job, "no create") except (UnboundLocalError, Exception) as e: self._stop(job, "misbehaving. Reason: " + str(e)) def _update_job(self, job): """ Tries to execute update() on specified job. This cannot fail thus it is catching every exception. If job.update() returns False, number of retries_left is decremented. If there are no more retries, job is stopped. Job is also stopped if it throws an exception. This is also updating job run time chart. Return False if job is stopped :param job: object :return: boolean """ t_start = time.time() # check if it is time to execute job update() function if job.timetable['next'] > t_start: debug(job.chart_name + " will be run in " + str(int((job.timetable['next'] - t_start) * 1000)) + " ms") return True try: if self.first_run: since_last = 0 else: since_last = int((t_start - job.timetable['last']) * 1000000) debug(job.chart_name + " ready to run, after " + str(int((t_start - job.timetable['last']) * 1000)) + " ms (update_every: " + str(job.timetable['freq'] * 1000) + " ms, latency: " + str(int((t_start - job.timetable['next']) * 1000)) + " ms)") if not job.update(since_last): if job.retries_left <= 0: self._stop(job, "update failed") return False job.retries_left -= 1 job.timetable['next'] += job.timetable['freq'] return True except AttributeError: self._stop(job, "no update") return False except (UnboundLocalError, Exception) as e: self._stop(job, "misbehaving. Reason: " + str(e)) return False t_end = time.time() job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq'] # draw performance graph run_time = str(int((t_end - t_start) * 1000)) debug(job.chart_name, "updated in", run_time, "ms") sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.chart_name + " " + str(since_last) + '\n') sys.stdout.write("SET run_time = " + run_time + '\n') sys.stdout.write("END\n") # sys.stdout.flush() job.timetable['last'] = t_start job.retries_left = job.retries self.first_run = False return True def update(self): """ Tries to execute update() on every job by using _update_job() This will stay forever and ever and ever forever and ever it'll be the one... """ self.first_run = True while True: next_runs = [] i = 0 while i < len(self.jobs): job = self.jobs[i] if self._update_job(job): try: next_runs.append(job.timetable['next']) except KeyError: pass i += 1 if len(next_runs) == 0: fatal('no python.d modules loaded.') time.sleep(min(next_runs) - time.time()) def read_config(path): """ Read YAML configuration from specified file :param path: str :return: dict """ try: with open(path, 'r') as stream: config = yaml.load(stream) except (OSError, IOError): error(str(path), "is not a valid configuration file") return None except yaml.YAMLError as e: error(str(path), "is malformed:", e) return None return config def parse_cmdline(directory, *commands): """ Parse parameters from command line. :param directory: str :param commands: list of str :return: dict """ global DEBUG_FLAG global OVERRIDE_UPDATE_EVERY global BASE_CONFIG changed_update = False mods = [] for cmd in commands[1:]: if cmd == "check": pass elif cmd == "debug" or cmd == "all": DEBUG_FLAG = True # redirect stderr to stdout? elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd): DEBUG_FLAG = True mods.append(cmd.replace(".chart.py", "")) else: try: BASE_CONFIG['update_every'] = int(cmd) changed_update = True except ValueError: pass if changed_update and DEBUG_FLAG: OVERRIDE_UPDATE_EVERY = True debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every'])) debug("started from", commands[0], "with options:", *commands[1:]) return mods # if __name__ == '__main__': def run(): """ Main program. """ global DEBUG_FLAG, BASE_CONFIG # read configuration file disabled = [] configfile = CONFIG_DIR + "python.d.conf" debug(PROGRAM, "reading configuration file:", configfile) conf = read_config(configfile) if conf is not None: try: # exit the whole plugin when 'enabled: no' is set in 'python.d.conf' if conf['enabled'] is False: fatal('disabled in configuration file.\n') except (KeyError, TypeError): pass try: for param in BASE_CONFIG: BASE_CONFIG[param] = conf[param] except (KeyError, TypeError): pass # use default update_every from NETDATA_UPDATE_EVERY try: DEBUG_FLAG = conf['debug'] except (KeyError, TypeError): pass for k, v in conf.items(): if k in ("update_every", "debug", "enabled"): continue if v is False: disabled.append(k) # parse passed command line arguments modules = parse_cmdline(MODULES_DIR, *sys.argv) info("MODULES_DIR='" + MODULES_DIR + "', CONFIG_DIR='" + CONFIG_DIR + "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) + ", ONLY_MODULES=" + str(modules)) # run plugins charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled) charts.check() charts.create() charts.update() fatal("finished") if __name__ == '__main__': run()