import os
import sys
import time
+import threading
# -----------------------------------------------------------------------------
# globals & environment setup
'retries': 10}
MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
- os.path.dirname(__file__)) + "/../python.d") + "/"
+ os.path.dirname(__file__)) + "/../python.d") + "/"
CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
# directories should end with '/'
if CONFIG_DIR[-1] != "/":
DEBUG_FLAG = False
OVERRIDE_UPDATE_EVERY = False
-
# -----------------------------------------------------------------------------
-# logging functions
-def log_msg(msg_type, *args):
- """
- Print message on stderr.
- :param msg_type: str
- """
- sys.stderr.write(PROGRAM)
- sys.stderr.write(" ")
- sys.stderr.write(msg_type)
- sys.stderr.write(": ")
- for i in args:
- sys.stderr.write(" ")
- sys.stderr.write(str(i))
- sys.stderr.write("\n")
- sys.stderr.flush()
-
-
-def debug(*args):
- """
- Print debug message on stderr.
- """
- if not DEBUG_FLAG:
- return
-
- log_msg("DEBUG", *args)
-
-
-def error(*args):
- """
- Print message on stderr.
- """
- log_msg("ERROR", *args)
-
-
-def info(*args):
- """
- Print message on stderr.
- """
- log_msg("INFO", *args)
-
-
-def fatal(*args):
- """
- Print message on stderr and exit.
- """
- log_msg("FATAL", *args)
- sys.stdout.write('DISABLE\n')
- sys.exit(1)
+# custom, third party and version specific python modules management
+import msg
-
-# -----------------------------------------------------------------------------
-# third party and version specific python modules management
try:
assert sys.version_info >= (3, 1)
import importlib.machinery
# import builtins
# builtins.PY_VERSION = 3
PY_VERSION = 3
- info('Using python v3')
+ msg.info('Using python v3')
except (AssertionError, ImportError):
try:
import imp
# import __builtin__
# __builtin__.PY_VERSION = 2
PY_VERSION = 2
- info('Using python v2')
+ msg.info('Using python v2')
except ImportError:
- fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
+ msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
try:
import yaml
except ImportError:
- fatal('Cannot find yaml library')
+ msg.fatal('Cannot find yaml library')
class PythonCharts(object):
"""
Main class used to control every python module.
"""
+
def __init__(self,
modules=None,
modules_path='../python.d/',
else:
return imp.load_source(name, path)
except Exception as e:
- error("Problem loading", name, str(e))
+ msg.error("Problem loading", name, str(e))
return None
def _load_modules(self, path, modules, disabled):
# check if plugin directory exists
if not os.path.isdir(path):
- fatal("cannot find charts directory ", path)
+ msg.fatal("cannot find charts directory ", path)
# load modules
loaded = []
if mod is not None:
loaded.append(mod)
else: # exit if plugin is not found
- fatal('no modules found.')
+ msg.fatal('no modules found.')
else:
# scan directory specified in path and load all modules from there
names = os.listdir(path)
for mod in names:
if mod.replace(MODULE_EXTENSION, "") in disabled:
- error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
+ msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
continue
m = self._import_module(path + mod)
if m is not None:
- debug(mod + ": loading module '" + path + mod + "'")
+ msg.debug(mod + ": loading module '" + path + mod + "'")
loaded.append(m)
return loaded
for mod in modules:
configfile = self.configs + mod.__name__ + ".conf"
if os.path.isfile(configfile):
- debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
+ msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
try:
if not hasattr(mod, 'config'):
mod.config = {}
'config',
self._parse_config(mod, read_config(configfile)))
except Exception as e:
- error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
+ msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
else:
- error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
+ msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
# set config if not found
if not hasattr(mod, 'config'):
- debug(mod.__name__ + ": setting configuration for only one job")
+ msg.debug(mod.__name__ + ": setting configuration for only one job")
mod.config = {None: {}}
for var in BASE_CONFIG:
try:
"""
# get default values
defaults = {}
- debug(module.__name__ + ": reading configuration")
+ msg.debug(module.__name__ + ": reading configuration")
for key in BASE_CONFIG:
try:
# get defaults from module config
try:
job = module.Service(configuration=conf, name=name)
except Exception as e:
- error(module.__name__ +
- ("/" + str(name) if name is not None else "") +
- ": cannot start job: '" +
- str(e))
+ msg.error(module.__name__ +
+ ("/" + str(name) if name is not None else "") +
+ ": cannot start job: '" +
+ str(e))
return None
else:
# set chart_name (needed to plot run time graphs)
if name is not None:
job.chart_name += "_" + name
jobs.append(job)
- debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
+ msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
return [j for j in jobs if j is not None]
if reason is None:
return
elif reason[:3] == "no ":
- error(prefix +
- "does not seem to have " +
- reason[3:] +
- "() function. Disabling it.")
+ msg.error(prefix +
+ "does not seem to have " +
+ reason[3:] +
+ "() function. Disabling it.")
elif reason[:7] == "failed ":
- error(prefix +
- reason[7:] +
- "() function reports failure.")
+ msg.error(prefix +
+ reason[7:] +
+ "() function reports failure.")
elif reason[:13] == "configuration":
- error(prefix +
- "configuration file '" +
- self.configs +
- job.__module__ +
- ".conf' not found. Using defaults.")
+ msg.error(prefix +
+ "configuration file '" +
+ self.configs +
+ job.__module__ +
+ ".conf' not found. Using defaults.")
elif reason[:11] == "misbehaving":
- error(prefix + "is " + reason)
+ msg.error(prefix + "is " + reason)
def check(self):
"""
if not job.check():
self._stop(job, "failed check")
else:
- debug(job.chart_name, ": check succeeded")
+ msg.debug(job.chart_name, ": check succeeded")
i += 1
except AttributeError:
self._stop(job, "no check")
str(job.timetable['freq']) +
'\n')
sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
- debug("created charts for", job.chart_name)
+ msg.debug("created charts for", job.chart_name)
# sys.stdout.flush()
i += 1
except AttributeError:
except (UnboundLocalError, Exception) as e:
self._stop(job, "misbehaving. Reason: " + str(e))
- def _update_job(self, job):
- """
- Tries to execute update() on specified job.
- This cannot fail thus it is catching every exception.
- If job.update() returns False, number of retries_left is decremented.
- If there are no more retries, job is stopped.
- Job is also stopped if it throws an exception.
- This is also updating job run time chart.
- Return False if job is stopped
- :param job: object
- :return: boolean
- """
- t_start = time.time()
- # check if it is time to execute job update() function
- if job.timetable['next'] > t_start:
- debug(job.chart_name + " will be run in " + str(int((job.timetable['next'] - t_start) * 1000)) + " ms")
- return True
- try:
- if self.first_run:
- since_last = 0
- else:
- since_last = int((t_start - job.timetable['last']) * 1000000)
- debug(job.chart_name +
- " ready to run, after " + str(int((t_start - job.timetable['last']) * 1000)) +
- " ms (update_every: " + str(job.timetable['freq'] * 1000) +
- " ms, latency: " + str(int((t_start - job.timetable['next']) * 1000)) + " ms)")
- if not job.update(since_last):
- if job.retries_left <= 0:
- self._stop(job, "update failed")
- return False
- job.retries_left -= 1
- job.timetable['next'] += job.timetable['freq']
- return True
- except AttributeError:
- self._stop(job, "no update")
- return False
- except (UnboundLocalError, Exception) as e:
- self._stop(job, "misbehaving. Reason: " + str(e))
- return False
- t_end = time.time()
- job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq']
- # draw performance graph
- run_time = str(int((t_end - t_start) * 1000))
- debug(job.chart_name, "updated in", run_time, "ms")
- sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.chart_name + " " + str(since_last) + '\n')
- sys.stdout.write("SET run_time = " + run_time + '\n')
- sys.stdout.write("END\n")
- # sys.stdout.flush()
- job.timetable['last'] = t_start
- job.retries_left = job.retries
- self.first_run = False
- return True
-
def update(self):
- """
- Tries to execute update() on every job by using _update_job()
- This will stay forever and ever and ever forever and ever it'll be the one...
- """
- self.first_run = True
+ for job in self.jobs:
+ job.start()
+
while True:
- next_runs = []
- i = 0
- while i < len(self.jobs):
- job = self.jobs[i]
- if self._update_job(job):
- try:
- next_runs.append(job.timetable['next'])
- except KeyError:
- pass
- i += 1
- if len(next_runs) == 0:
- fatal('no python.d modules loaded.')
- try:
- time.sleep(min(next_runs) - time.time())
- except IOError:
- pass
+ if threading.active_count() <= 1:
+ msg.fatal("no more jobs")
+ time.sleep(1)
def read_config(path):
with open(path, 'r') as stream:
config = yaml.load(stream)
except (OSError, IOError):
- error(str(path), "is not a valid configuration file")
+ msg.error(str(path), "is not a valid configuration file")
return None
except yaml.YAMLError as e:
- error(str(path), "is malformed:", e)
+ msg.error(str(path), "is malformed:", e)
return None
return config
pass
if changed_update and DEBUG_FLAG:
OVERRIDE_UPDATE_EVERY = True
- debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
+ msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
- debug("started from", commands[0], "with options:", *commands[1:])
+ msg.debug("started from", commands[0], "with options:", *commands[1:])
return mods
# read configuration file
disabled = []
configfile = CONFIG_DIR + "python.d.conf"
- debug(PROGRAM, "reading configuration file:", configfile)
+ msg.info(PROGRAM, "reading configuration file:", configfile)
conf = read_config(configfile)
if conf is not None:
try:
# exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
if conf['enabled'] is False:
- fatal('disabled in configuration file.\n')
+ msg.fatal('disabled in configuration file.\n')
except (KeyError, TypeError):
pass
try:
if v is False:
disabled.append(k)
+ DEBUG_FLAG = True
+ msg.DEBUG_FLAG = DEBUG_FLAG
+ msg.PROGRAM = PROGRAM
# parse passed command line arguments
modules = parse_cmdline(MODULES_DIR, *sys.argv)
- info("MODULES_DIR='" + MODULES_DIR +
- "', CONFIG_DIR='" + CONFIG_DIR +
- "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
- ", ONLY_MODULES=" + str(modules))
+ msg.info("MODULES_DIR='" + MODULES_DIR +
+ "', CONFIG_DIR='" + CONFIG_DIR +
+ "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
+ ", ONLY_MODULES=" + str(modules))
# run plugins
charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
charts.check()
charts.create()
charts.update()
- fatal("finished")
+ msg.fatal("finished")
if __name__ == '__main__':
# Description: prototypes for netdata python.d modules
# Author: Pawel Krupa (paulfantom)
-from time import time
+import time
import sys
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
+import threading
+import msg
-class BaseService(object):
+
+class BaseService(threading.Thread):
"""
Prototype of Service class.
Implemented basic functionality to run jobs by `python.d.plugin`
"""
+ debugging = False
+
def __init__(self, configuration=None, name=None):
"""
This needs to be initialized in child classes
:param configuration: dict
:param name: str
"""
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.retries = 0
+ self.retries_left = 0
+ self.priority = 140000
+ self.update_every = 1
self.name = name
if configuration is None:
self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
"""
if freq is None:
freq = self.update_every
- now = time()
+ now = time.time()
self.timetable = {'last': now,
'next': now - (now % freq) + freq,
'freq': freq}
- @staticmethod
- def error(msg, exception=""):
- if exception != "":
- exception = " " + str(exception).replace("\n", " ")
- sys.stderr.write(str(msg)+exception+"\n")
- sys.stderr.flush()
+ def _run_once(self):
+ t_start = time.time()
+ # check if it is time to execute job update() function
+ if self.timetable['next'] > t_start:
+ msg.debug(self.chart_name + " will be run in " +
+ str(int((self.timetable['next'] - t_start) * 1000)) + " ms")
+ return True
+
+ since_last = int((t_start - self.timetable['last']) * 1000000)
+ msg.debug(self.chart_name +
+ " ready to run, after " + str(int((t_start - self.timetable['last']) * 1000)) +
+ " ms (update_every: " + str(self.timetable['freq'] * 1000) +
+ " ms, latency: " + str(int((t_start - self.timetable['next']) * 1000)) + " ms)")
+ if not self.update(since_last):
+ return False
+ t_end = time.time()
+ self.timetable['next'] = t_end - (t_end % self.timetable['freq']) + self.timetable['freq']
+
+ # draw performance graph
+ run_time = str(int((t_end - t_start) * 1000))
+ run_time_chart = "BEGIN netdata.plugin_pythond_" + self.chart_name + " " + str(since_last) + '\n'
+ run_time_chart += "SET run_time = " + run_time + '\n'
+ run_time_chart += "END\n"
+ sys.stdout.write(run_time_chart)
+ msg.debug(self.chart_name + " updated in " + str(run_time) + " ms")
+ self.timetable['last'] = t_start
+ return True
+
+ def run(self):
+ self.timetable['last'] = time.time()
+ while True:
+ try:
+ status = self._run_once()
+ except Exception as e:
+ msg.error("Something wrong: " + str(e))
+ return
+ if status:
+ time.sleep(self.timetable['next'] - time.time())
+ self.retries_left = self.retries
+ else:
+ self.retries -= 1
+ if self.retries_left <= 0:
+ msg.error("no more retries. Exiting")
+ return
+ else:
+ time.sleep(self.timetable['freq'])
def check(self):
"""
check() prototype
:return: boolean
"""
- self.error("Service " + str(self.__module__) + "doesn't implement check() function")
+ msg.error("Service " + str(self.__module__) + "doesn't implement check() function")
return False
def create(self):
create() prototype
:return: boolean
"""
- self.error("Service " + str(self.__module__) + "doesn't implement create() function?")
+ msg.error("Service " + str(self.__module__) + "doesn't implement create() function?")
return False
def update(self, interval):
:param interval: int
:return: boolean
"""
- self.error("Service " + str(self.__module__) + "doesn't implement update() function")
+ msg.error("Service " + str(self.__module__) + "doesn't implement update() function")
return False
f = urlopen(self.url, timeout=self.update_every)
raw = f.read().decode('utf-8')
except Exception as e:
- self.error(self.__module__, str(e))
+ msg.error(self.__module__, str(e))
finally:
try:
f.close()
Format configuration data and try to connect to server
:return: boolean
"""
- if self.name is None:
+ if self.name is None or self.name == str(None):
self.name = 'local'
else:
self.name = str(self.name)
content += "DIMENSION " + line['name'] + " " + line['options'] + "\n"
if len(content) > 0:
- print(header)
- print(content)
+ print(header + "\n" + content)
idx += 1
if idx == 0:
except KeyError:
pass
if len(c) != 0:
- print(header + c)
- print("END")
+ print(header + c + "\nEND")
return True