-#!/usr/bin/env python3
+#!/usr/bin/env bash
+'''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
+# -*- coding: utf-8 -*-
+
+# Description: netdata python modules supervisor
+# Author: Pawel Krupa (paulfantom)
import os
import sys
import time
+import threading
+
+# -----------------------------------------------------------------------------
+# globals & environment setup
+# https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
+MODULE_EXTENSION = ".chart.py"
+BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
+ 'priority': 90000,
+ 'retries': 10}
+
+MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
+ os.path.dirname(__file__)) + "/../python.d") + "/"
+CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
+# directories should end with '/'
+if CONFIG_DIR[-1] != "/":
+ CONFIG_DIR += "/"
+sys.path.append(MODULES_DIR + "python_modules")
+
+PROGRAM = os.path.basename(__file__).replace(".plugin", "")
+DEBUG_FLAG = False
+OVERRIDE_UPDATE_EVERY = False
+
+# -----------------------------------------------------------------------------
+# custom, third party and version specific python modules management
+import msg
try:
assert sys.version_info >= (3, 1)
import importlib.machinery
-except AssertionError:
- sys.stderr.write('python.d.plugin: Not supported python version. Needed python >= 3.1\n')
- sys.stdout.write('DISABLE\n')
- sys.exit(1)
+ PY_VERSION = 3
+ # change this hack below if we want PY_VERSION to be used in modules
+ # import builtins
+ # builtins.PY_VERSION = 3
+ msg.info('Using python v3')
+except (AssertionError, ImportError):
+ try:
+ import imp
+
+ # change this hack below if we want PY_VERSION to be used in modules
+ # import __builtin__
+ # __builtin__.PY_VERSION = 2
+ PY_VERSION = 2
+ msg.info('Using python v2')
+ except ImportError:
+ msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
+# try:
+# import yaml
+# except ImportError:
+# msg.fatal('Cannot find yaml library')
try:
- import yaml
+ if PY_VERSION == 3:
+ import pyyaml3 as yaml
+ else:
+ import pyyaml2 as yaml
except ImportError:
- sys.stderr.write('python.d.plugin: Cannot find yaml library\n')
- sys.stdout.write('DISABLE\n')
- sys.exit(1)
-
-DEBUG_FLAG = False
-PROGRAM = "python.d.plugin"
-MODULE_EXTENSION = ".chart.py"
-BASE_CONFIG = {'update_every': 10,
- 'priority': 12345,
- 'retries': 0}
+ msg.fatal('Cannot find yaml library')
class PythonCharts(object):
+ """
+ Main class used to control every python module.
+ """
+
def __init__(self,
- interval=None,
- modules=[],
+ modules=None,
modules_path='../python.d/',
modules_configs='../conf.d/',
- modules_disabled=[]):
+ modules_disabled=None):
+ """
+ :param modules: list
+ :param modules_path: str
+ :param modules_configs: str
+ :param modules_disabled: list
+ """
+
+ if modules is None:
+ modules = []
+ if modules_disabled is None:
+ modules_disabled = []
+
self.first_run = True
# set configuration directory
self.configs = modules_configs
configured_modules = self._load_configs(loaded_modules)
# good economy and prosperity:
- self.jobs = self._create_jobs(configured_modules)
- if DEBUG_FLAG and interval is not None:
- for job in self.jobs:
- job.create_timetable(interval)
+ self.jobs = self._create_jobs(configured_modules) # type: list
- @staticmethod
- def _create_jobs(modules):
- # module store a definition of Service class
- # module store configuration in module.config
- # configs are list of dicts or a dict
- # one dict is one service
- # iterate over list of modules and inside one loop iterate over configs
- jobs = []
- for module in modules:
- for name in module.config:
- # register a new job
- conf = module.config[name]
- try:
- job = module.Service(configuration=conf, name=name)
- except Exception as e:
- debug(module.__name__ +
- ": Couldn't start job named " +
- str(name) +
- ": " +
- str(e))
- return None
- else:
- # set execution_name (needed to plot run time graphs)
- job.execution_name = module.__name__
- if name is not None:
- job.execution_name += "_" + name
- jobs.append(job)
-
- return [j for j in jobs if j is not None]
+ # enable timetable override like `python.d.plugin mysql debug 1`
+ if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY:
+ for job in self.jobs:
+ job.create_timetable(BASE_CONFIG['update_every'])
@staticmethod
def _import_module(path, name=None):
- # try to import module using only its path
+ """
+ Try to import module using only its path.
+ :param path: str
+ :param name: str
+ :return: object
+ """
+
if name is None:
name = path.split('/')[-1]
if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION:
return None
name = name[:-len(MODULE_EXTENSION)]
try:
- return importlib.machinery.SourceFileLoader(name, path).load_module()
+ if PY_VERSION == 3:
+ return importlib.machinery.SourceFileLoader(name, path).load_module()
+ else:
+ return imp.load_source(name, path)
except Exception as e:
- debug(str(e))
+ msg.error("Problem loading", name, str(e))
return None
def _load_modules(self, path, modules, disabled):
+ """
+ Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files)
+ :param path: str
+ :param modules: list
+ :param disabled: list
+ :return: list
+ """
+
# check if plugin directory exists
if not os.path.isdir(path):
- debug("cannot find charts directory ", path)
- sys.stdout.write("DISABLE\n")
- sys.exit(1)
+ msg.fatal("cannot find charts directory ", path)
# load modules
loaded = []
mod = self._import_module(path + m + MODULE_EXTENSION)
if mod is not None:
loaded.append(mod)
+ else: # exit if plugin is not found
+ msg.fatal('no modules found.')
else:
# scan directory specified in path and load all modules from there
names = os.listdir(path)
for mod in names:
- if mod.strip(MODULE_EXTENSION) in disabled:
- debug("disabling:", mod.strip(MODULE_EXTENSION))
+ if mod.replace(MODULE_EXTENSION, "") in disabled:
+ msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
continue
m = self._import_module(path + mod)
if m is not None:
- debug("loading chart: '" + path + mod + "'")
+ msg.debug(mod + ": loading module '" + path + mod + "'")
loaded.append(m)
return loaded
def _load_configs(self, modules):
- # function loads configuration files to modules
+ """
+ Append configuration in list named `config` to every module.
+ For multi-job modules `config` list is created in _parse_config,
+ otherwise it is created here based on BASE_CONFIG prototype with None as identifier.
+ :param modules: list
+ :return: list
+ """
for mod in modules:
configfile = self.configs + mod.__name__ + ".conf"
if os.path.isfile(configfile):
- debug("loading chart options: '" + configfile + "'")
+ msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
try:
+ if not hasattr(mod, 'config'):
+ mod.config = {}
setattr(mod,
'config',
self._parse_config(mod, read_config(configfile)))
except Exception as e:
- debug("something went wrong while loading configuration", e)
+ msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
else:
- debug(mod.__name__ +
- ": configuration file '" +
- configfile +
- "' not found. Using defaults.")
+ msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
# set config if not found
if not hasattr(mod, 'config'):
+ msg.debug(mod.__name__ + ": setting configuration for only one job")
mod.config = {None: {}}
for var in BASE_CONFIG:
try:
@staticmethod
def _parse_config(module, config):
+ """
+ Parse configuration file or extract configuration from module file.
+ Example of returned dictionary:
+ config = {'name': {
+ 'update_every': 2,
+ 'retries': 3,
+ 'priority': 30000
+ 'other_val': 123}}
+ :param module: object
+ :param config: dict
+ :return: dict
+ """
+ if config is None:
+ config = {}
# get default values
defaults = {}
+ msg.debug(module.__name__ + ": reading configuration")
for key in BASE_CONFIG:
try:
# get defaults from module config
try:
# get defaults from module source code
defaults[key] = getattr(module, key)
- except (KeyError, ValueError):
+ except (KeyError, ValueError, AttributeError):
# if above failed, get defaults from global dict
defaults[key] = BASE_CONFIG[key]
# return dictionary of jobs where every job has BASE_CONFIG variables
return config
+ @staticmethod
+ def _create_jobs(modules):
+ """
+ Create jobs based on module.config dictionary and module.Service class definition.
+ :param modules: list
+ :return: list
+ """
+ jobs = []
+ for module in modules:
+ for name in module.config:
+ # register a new job
+ conf = module.config[name]
+ try:
+ job = module.Service(configuration=conf, name=name)
+ except Exception as e:
+ msg.error(module.__name__ +
+ ("/" + str(name) if name is not None else "") +
+ ": cannot start job: '" +
+ str(e))
+ return None
+ else:
+ # set chart_name (needed to plot run time graphs)
+ job.chart_name = module.__name__
+ if name is not None:
+ job.chart_name += "_" + name
+ jobs.append(job)
+ msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
+
+ return [j for j in jobs if j is not None]
+
def _stop(self, job, reason=None):
- # modifies self.jobs
- self.jobs.remove(job)
+ """
+ Stop specified job and remove it from self.jobs list
+ Also notifies user about job failure if DEBUG_FLAG is set
+ :param job: object
+ :param reason: str
+ """
+ prefix = job.__module__
+ if job.name is not None and len(job.name) != 0:
+ prefix += "/" + job.name
+ try:
+ msg.error("DISABLED:", prefix)
+ self.jobs.remove(job)
+ except Exception as e:
+ msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e))
+
+ # TODO remove section below and remove `reason`.
+ prefix += ": "
if reason is None:
return
elif reason[:3] == "no ":
- debug("chart '" +
- job.execution_name,
- "' does not seem to have " +
- reason[3:] +
- "() function. Disabling it.")
+ msg.error(prefix +
+ "does not seem to have " +
+ reason[3:] +
+ "() function. Disabling it.")
elif reason[:7] == "failed ":
- debug("chart '" +
- job.execution_name + "' " +
- reason[7:] +
- "() function reports failure.")
+ msg.error(prefix +
+ reason[7:] +
+ "() function reports failure.")
elif reason[:13] == "configuration":
- debug(job.execution_name,
- "configuration file '" +
- self.configs +
- job.execution_name +
- ".conf' not found. Using defaults.")
+ msg.error(prefix +
+ "configuration file '" +
+ self.configs +
+ job.__module__ +
+ ".conf' not found. Using defaults.")
elif reason[:11] == "misbehaving":
- debug(job.execution_name, "is " + reason)
+ msg.error(prefix + "is " + reason)
def check(self):
- # try to execute check() on every job
- for job in self.jobs:
+ """
+ Tries to execute check() on every job.
+ This cannot fail thus it is catching every exception
+ If job.check() fails job is stopped
+ """
+ i = 0
+ overridden = []
+ msg.debug("all job objects", str(self.jobs))
+ while i < len(self.jobs):
+ job = self.jobs[i]
try:
if not job.check():
- self._stop(job, "failed check")
- except AttributeError:
- self._stop(job, "no check")
+ msg.error(job.chart_name, "check function failed.")
+ self._stop(job)
+ else:
+ msg.info("CHECKED OK:", job.chart_name)
+ i += 1
+ try:
+ if job.override_name is not None:
+ new_name = job.__module__ + '_' + job.override_name
+ if new_name in overridden:
+ msg.info("DROPPED:", job.name, ", job '" + job.override_name + "' is already served by another job.")
+ self._stop(job)
+ i -= 1
+ else:
+ job.name = job.override_name
+ msg.info("RENAMED:", new_name, ", from " + job.chart_name)
+ job.chart_name = new_name
+ overridden.append(job.chart_name)
+ except Exception:
+ pass
+ except AttributeError as e:
+ self._stop(job)
+ msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.")
+ msg.debug(str(e))
except (UnboundLocalError, Exception) as e:
- self._stop(job, "misbehaving. Reason: " + str(e))
+ msg.error(job.chart_name, str(e))
+ self._stop(job)
+ msg.debug("overridden job names:", str(overridden))
+ msg.debug("all remaining job objects:", str(self.jobs))
def create(self):
- # try to execute create() on every job
- for job in self.jobs:
+ """
+ Tries to execute create() on every job.
+ This cannot fail thus it is catching every exception.
+ If job.create() fails job is stopped.
+ This is also creating job run time chart.
+ """
+ i = 0
+ while i < len(self.jobs):
+ job = self.jobs[i]
try:
if not job.create():
- self._stop(job, "failed create")
+ msg.error(job.chart_name, "create function failed.")
+ self._stop(job)
else:
- chart = job.execution_name
+ chart = job.chart_name
sys.stdout.write(
"CHART netdata.plugin_pythond_" +
chart +
str(job.timetable['freq']) +
'\n')
sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
- sys.stdout.flush()
+ msg.debug("created charts for", job.chart_name)
+ # sys.stdout.flush()
+ i += 1
except AttributeError:
- self._stop(job, "no create")
+ msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.")
+ self._stop(job)
except (UnboundLocalError, Exception) as e:
- self._stop(job, "misbehaving. Reason: " + str(e))
-
- def _update_job(self, job):
- # try to execute update() on every job and draw run time graph
- t_start = time.time()
- # check if it is time to execute job update() function
- if job.timetable['next'] > t_start:
- return
- try:
- if self.first_run:
- since_last = 0
- else:
- since_last = int((t_start - job.timetable['last']) * 1000000)
- if not job.update(since_last):
- self._stop(job, "update failed")
- return
- except AttributeError:
- self._stop(job, "no update")
- return
- except (UnboundLocalError, Exception) as e:
- self._stop(job, "misbehaving. Reason: " + str(e))
- return
- t_end = time.time()
- job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq']
- # draw performance graph
- sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.execution_name + " " + str(since_last) + '\n')
- sys.stdout.write("SET run_time = " + str(int((t_end - t_start) * 1000)) + '\n')
- sys.stdout.write("END\n")
- sys.stdout.flush()
- job.timetable['last'] = t_start
- self.first_run = False
+ msg.error(job.chart_name, str(e))
+ self._stop(job)
def update(self):
- # run updates (this will stay forever and ever and ever forever and ever it'll be the one...)
- self.first_run = True
+ """
+ Creates and supervises every job thread.
+ This will stay forever and ever and ever forever and ever it'll be the one...
+ """
+ for job in self.jobs:
+ job.start()
+
while True:
- next_runs = []
- for job in self.jobs:
- self._update_job(job)
- try:
- next_runs.append(job.timetable['next'])
- except KeyError:
- pass
- if len(next_runs) == 0:
- debug("No plugins loaded")
- sys.stdout.write("DISABLE\n")
- sys.exit(1)
- time.sleep(min(next_runs) - time.time())
+ if threading.active_count() <= 1:
+ msg.fatal("no more jobs")
+ time.sleep(1)
def read_config(path):
+ """
+ Read YAML configuration from specified file
+ :param path: str
+ :return: dict
+ """
try:
with open(path, 'r') as stream:
config = yaml.load(stream)
- except IsADirectoryError:
- debug(str(path), "is a directory")
+ except (OSError, IOError):
+ msg.error(str(path), "is not a valid configuration file")
return None
except yaml.YAMLError as e:
- debug(str(path), "is malformed:", e)
+ msg.error(str(path), "is malformed:", e)
return None
return config
-def debug(*args):
- if not DEBUG_FLAG:
- return
- sys.stderr.write(PROGRAM + ":")
- for i in args:
- sys.stderr.write(" " + str(i))
- sys.stderr.write("\n")
- sys.stderr.flush()
-
-
def parse_cmdline(directory, *commands):
+ """
+ Parse parameters from command line.
+ :param directory: str
+ :param commands: list of str
+ :return: dict
+ """
global DEBUG_FLAG
- interval = None
+ global OVERRIDE_UPDATE_EVERY
+ global BASE_CONFIG
+ changed_update = False
mods = []
for cmd in commands[1:]:
if cmd == "check":
DEBUG_FLAG = True
# redirect stderr to stdout?
elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
- DEBUG_FLAG = True
+ #DEBUG_FLAG = True
mods.append(cmd.replace(".chart.py", ""))
else:
try:
- interval = int(cmd)
+ BASE_CONFIG['update_every'] = int(cmd)
+ changed_update = True
except ValueError:
pass
+ if changed_update and DEBUG_FLAG:
+ OVERRIDE_UPDATE_EVERY = True
+ msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
- debug("started from", commands[0], "with options:", *commands[1:])
- if len(mods) == 0 and DEBUG_FLAG is False:
- interval = None
+ msg.debug("started from", commands[0], "with options:", *commands[1:])
- return {'interval': interval,
- 'modules': mods}
+ return mods
# if __name__ == '__main__':
def run():
- global DEBUG_FLAG, PROGRAM
- DEBUG_FLAG = True
- PROGRAM = sys.argv[0].split('/')[-1].split('.plugin')[0]
- # parse env variables
- # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
- main_dir = os.getenv('NETDATA_PLUGINS_DIR',
- os.path.abspath(__file__).strip("python.d.plugin.py"))
- config_dir = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
- interval = os.getenv('NETDATA_UPDATE_EVERY', None)
+ """
+ Main program.
+ """
+ global DEBUG_FLAG, BASE_CONFIG
# read configuration file
disabled = []
- if config_dir[-1] != '/':
- config_dir += '/'
- configfile = config_dir + "python.d.conf"
-
- try:
- conf = read_config(configfile)
- print(conf)
+ configfile = CONFIG_DIR + "python.d.conf"
+ msg.PROGRAM = PROGRAM
+ msg.info("reading configuration file:", configfile)
+ log_throttle = 200
+ log_interval = 3600
+
+ conf = read_config(configfile)
+ if conf is not None:
try:
- if str(conf['enable']) is False:
- debug("disabled in configuration file")
- sys.stdout.write("DISABLE\n")
- sys.exit(1)
+ # exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
+ if conf['enabled'] is False:
+ msg.fatal('disabled in configuration file.\n')
except (KeyError, TypeError):
pass
try:
- modules_conf = conf['plugins_config_dir']
- except KeyError:
- modules_conf = config_dir + "python.d/" # default configuration directory
+ for param in BASE_CONFIG:
+ BASE_CONFIG[param] = conf[param]
+ except (KeyError, TypeError):
+ pass # use default update_every from NETDATA_UPDATE_EVERY
try:
- modules_dir = conf['plugins_dir']
- except KeyError:
- modules_dir = main_dir.replace("plugins.d", "python.d")
+ DEBUG_FLAG = conf['debug']
+ except (KeyError, TypeError):
+ pass
try:
- interval = conf['interval']
- except KeyError:
- pass # use default interval from NETDATA_UPDATE_EVERY
+ log_throttle = conf['logs_per_interval']
+ except (KeyError, TypeError):
+ pass
try:
- DEBUG_FLAG = conf['debug']
- except KeyError:
+ log_interval = conf['log_interval']
+ except (KeyError, TypeError):
pass
for k, v in conf.items():
- if k in ("plugins_config_dir", "plugins_dir", "interval", "debug"):
+ if k in ("update_every", "debug", "enabled"):
continue
if v is False:
disabled.append(k)
- except FileNotFoundError:
- modules_conf = config_dir + "python.d/"
- modules_dir = main_dir.replace("plugins.d", "python.d")
-
- # directories should end with '/'
- if modules_dir[-1] != '/':
- modules_dir += "/"
- if modules_conf[-1] != '/':
- modules_conf += "/"
# parse passed command line arguments
- out = parse_cmdline(modules_dir, *sys.argv)
- modules = out['modules']
- if out['interval'] is not None:
- interval = out['interval']
-
- # configure environment to run modules
- sys.path.append(modules_dir + "python_modules") # append path to directory with modules dependencies
+ modules = parse_cmdline(MODULES_DIR, *sys.argv)
+ msg.DEBUG_FLAG = DEBUG_FLAG
+ msg.LOG_THROTTLE = log_throttle
+ msg.LOG_INTERVAL = log_interval
+ msg.LOG_COUNTER = 0
+ msg.LOG_NEXT_CHECK = 0
+ msg.info("MODULES_DIR='" + MODULES_DIR +
+ "', CONFIG_DIR='" + CONFIG_DIR +
+ "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
+ ", ONLY_MODULES=" + str(modules))
# run plugins
- charts = PythonCharts(interval, modules, modules_dir, modules_conf, disabled)
+ charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
charts.check()
charts.create()
charts.update()
- sys.stdout.write("DISABLE")
+ msg.fatal("finished")
if __name__ == '__main__':