]> arthur.barton.de Git - netdata.git/blob - plugins.d/python.d.plugin
Merge pull request #590 from ktsaou/master
[netdata.git] / plugins.d / python.d.plugin
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # Description: netdata python modules supervisor
5 # Author: Pawel Krupa (paulfantom)
6
7 import os
8 import sys
9 import time
10
11
12 MODULE_EXTENSION = ".chart.py"
13 BASE_CONFIG = {'update_every': 1,
14                'priority': 90000,
15                'retries': 10}
16
17
18 # -----------------------------------------------------------------------------
19 # logging functions
20
21 PROGRAM = os.path.basename(__file__).replace(".plugin", "")
22 DEBUG_FLAG = False
23
24 def debug(*args):
25     """
26     Print message on stderr.
27     """
28     if not DEBUG_FLAG:
29         return
30     sys.stderr.write(PROGRAM + " DEBUG :")
31     for i in args:
32         sys.stderr.write(" " + str(i))
33     sys.stderr.write("\n")
34     sys.stderr.flush()
35
36 def error(*args):
37     """
38     Print message on stderr.
39     """
40     sys.stderr.write(PROGRAM + " ERROR :")
41     for i in args:
42         sys.stderr.write(" " + str(i))
43     sys.stderr.write("\n")
44     sys.stderr.flush()
45
46 def info(*args):
47     """
48     Print message on stderr.
49     """
50     sys.stderr.write(PROGRAM + " INFO :")
51     for i in args:
52         sys.stderr.write(" " + str(i))
53     sys.stderr.write("\n")
54     sys.stderr.flush()
55
56 def fatal(*args):
57     """
58     Print message on stderr and exit.
59     """
60     sys.stderr.write(PROGRAM + " FATAL :")
61     for i in args:
62         sys.stderr.write(" " + str(i))
63     sys.stderr.write("\n")
64     sys.stderr.flush()
65     sys.stdout.write('DISABLE\n')
66     sys.exit(1)
67
68
69
70 # -----------------------------------------------------------------------------
71 # globals & python modules management
72
73 # setup environment
74 # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
75 MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
76                         os.path.dirname(__file__)) + "/../python.d") + "/"
77 CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
78 UPDATE_EVERY = os.getenv('NETDATA_UPDATE_EVERY', 1)
79 # directories should end with '/'
80 if CONFIG_DIR[-1] != "/":
81     CONFIG_DIR += "/"
82 sys.path.append(MODULES_DIR + "python_modules")
83
84 try:
85     assert sys.version_info >= (3, 1)
86     import importlib.machinery
87
88     # change this hack below if we want PY_VERSION to be used in modules
89     # import builtins
90     # builtins.PY_VERSION = 3
91     PY_VERSION = 3
92     info('Using python v3')
93 except (AssertionError, ImportError):
94     try:
95         import imp
96
97         # change this hack below if we want PY_VERSION to be used in modules
98         # import __builtin__
99         # __builtin__.PY_VERSION = 2
100         PY_VERSION = 2
101         info('Using python v2')
102     except ImportError:
103         fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
104 try:
105     import yaml
106 except ImportError:
107     fatal('Cannot find yaml library')
108
109 class PythonCharts(object):
110     """
111     Main class used to control every python module.
112     """
113     def __init__(self,
114                  update_every=None,
115                  modules=None,
116                  modules_path='../python.d/',
117                  modules_configs='../conf.d/',
118                  modules_disabled=None):
119         """
120         :param update_every: int
121         :param modules: list
122         :param modules_path: str
123         :param modules_configs: str
124         :param modules_disabled: list
125         """
126
127         if modules is None:
128             modules = []
129         if modules_disabled is None:
130             modules_disabled = []
131
132         self.first_run = True
133         # set configuration directory
134         self.configs = modules_configs
135
136         # load modules
137         loaded_modules = self._load_modules(modules_path, modules, modules_disabled)
138
139         # load configuration files
140         configured_modules = self._load_configs(loaded_modules)
141
142         # good economy and prosperity:
143         self.jobs = self._create_jobs(configured_modules)  # type: list
144         if DEBUG_FLAG and update_every is not None:
145             for job in self.jobs:
146                 job.create_timetable(update_every)
147
148     @staticmethod
149     def _import_module(path, name=None):
150         """
151         Try to import module using only its path.
152         :param path: str
153         :param name: str
154         :return: object
155         """
156
157         if name is None:
158             name = path.split('/')[-1]
159             if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION:
160                 return None
161             name = name[:-len(MODULE_EXTENSION)]
162         try:
163             if PY_VERSION == 3:
164                 return importlib.machinery.SourceFileLoader(name, path).load_module()
165             else:
166                 return imp.load_source(name, path)
167         except Exception as e:
168             debug(str(e))
169             return None
170
171     def _load_modules(self, path, modules, disabled):
172         """
173         Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files)
174         :param path: str
175         :param modules: list
176         :param disabled: list
177         :return: list
178         """
179
180         # check if plugin directory exists
181         if not os.path.isdir(path):
182             fatal("cannot find charts directory ", path)
183
184         # load modules
185         loaded = []
186         if len(modules) > 0:
187             for m in modules:
188                 if m in disabled:
189                     continue
190                 mod = self._import_module(path + m + MODULE_EXTENSION)
191                 if mod is not None:
192                     loaded.append(mod)
193                 else:  # exit if plugin is not found
194                     fatal('no modules found.')
195         else:
196             # scan directory specified in path and load all modules from there
197             names = os.listdir(path)
198             for mod in names:
199                 if mod.strip(MODULE_EXTENSION) in disabled:
200                     error(mod + ": disabled module ", mod.strip(MODULE_EXTENSION))
201                     continue
202                 m = self._import_module(path + mod)
203                 if m is not None:
204                     debug(mod + ": loading module '" + path + mod + "'")
205                     loaded.append(m)
206         return loaded
207
208     def _load_configs(self, modules):
209         """
210         Append configuration in list named `config` to every module.
211         For multi-job modules `config` list is created in _parse_config,
212         otherwise it is created here based on BASE_CONFIG prototype with None as identifier.
213         :param modules: list
214         :return: list
215         """
216         for mod in modules:
217             configfile = self.configs + mod.__name__ + ".conf"
218             if os.path.isfile(configfile):
219                 debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
220                 try:
221                     setattr(mod,
222                             'config',
223                             self._parse_config(mod, read_config(configfile)))
224                 except Exception as e:
225                     error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
226             else:
227                 error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
228                 # set config if not found
229                 if not hasattr(mod, 'config'):
230                     mod.config = {None: {}}
231                     for var in BASE_CONFIG:
232                         try:
233                             mod.config[None][var] = getattr(mod, var)
234                         except AttributeError:
235                             mod.config[None][var] = BASE_CONFIG[var]
236         return modules
237
238     @staticmethod
239     def _parse_config(module, config):
240         """
241         Parse configuration file or extract configuration from module file.
242         Example of returned dictionary:
243             config = {'name': {
244                             'update_every': 2,
245                             'retries': 3,
246                             'priority': 30000
247                             'other_val': 123}}
248         :param module: object
249         :param config: dict
250         :return: dict
251         """
252         # get default values
253         defaults = {}
254         for key in BASE_CONFIG:
255             try:
256                 # get defaults from module config
257                 defaults[key] = int(config.pop(key))
258             except (KeyError, ValueError):
259                 try:
260                     # get defaults from module source code
261                     defaults[key] = getattr(module, key)
262                 except (KeyError, ValueError):
263                     # if above failed, get defaults from global dict
264                     defaults[key] = BASE_CONFIG[key]
265
266         # check if there are dict in config dict
267         many_jobs = False
268         for name in config:
269             if type(config[name]) is dict:
270                 many_jobs = True
271                 break
272
273         # assign variables needed by supervisor to every job configuration
274         if many_jobs:
275             for name in config:
276                 for key in defaults:
277                     if key not in config[name]:
278                         config[name][key] = defaults[key]
279         # if only one job is needed, values doesn't have to be in dict (in YAML)
280         else:
281             config = {None: config.copy()}
282             config[None].update(defaults)
283
284         # return dictionary of jobs where every job has BASE_CONFIG variables
285         return config
286
287     @staticmethod
288     def _create_jobs(modules):
289         """
290         Create jobs based on module.config dictionary and module.Service class definition.
291         :param modules: list
292         :return: list
293         """
294         jobs = []
295         for module in modules:
296             for name in module.config:
297                 # register a new job
298                 conf = module.config[name]
299                 try:
300                     job = module.Service(configuration=conf, name=name)
301                 except Exception as e:
302                     error(module.__name__ +
303                           ("/" + str(name) if name is not None else "") +
304                           ": cannot start job: '" +
305                           str(e))
306                     return None
307                 else:
308                     # set chart_name (needed to plot run time graphs)
309                     job.chart_name = module.__name__
310                     if name is not None:
311                         job.chart_name += "_" + name
312                 jobs.append(job)
313                 debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
314
315         return [j for j in jobs if j is not None]
316
317     def _stop(self, job, reason=None):
318         """
319         Stop specified job and remove it from self.jobs list
320         Also notifies user about job failure if DEBUG_FLAG is set
321         :param job: object
322         :param reason: str
323         """
324         prefix = job.__module__
325         if job.name is not None:
326             prefix += "/" + job.name
327         prefix += ": "
328
329         self.jobs.remove(job)
330         if reason is None:
331             return
332         elif reason[:3] == "no ":
333             error(prefix +
334                   "does not seem to have " +
335                   reason[3:] +
336                   "() function. Disabling it.")
337         elif reason[:7] == "failed ":
338             error(prefix +
339                   reason[7:] +
340                   "() function reports failure.")
341         elif reason[:13] == "configuration":
342             error(prefix +
343                   "configuration file '" +
344                   self.configs +
345                   job.__module__ +
346                   ".conf' not found. Using defaults.")
347         elif reason[:11] == "misbehaving":
348             error(prefix + "is " + reason)
349
350     def check(self):
351         """
352         Tries to execute check() on every job.
353         This cannot fail thus it is catching every exception
354         If job.check() fails job is stopped
355         """
356         i = 0
357         while i < len(self.jobs):
358             job = self.jobs[i]
359             try:
360                 if not job.check():
361                     self._stop(job, "failed check")
362                 else:
363                     i += 1
364             except AttributeError:
365                 self._stop(job, "no check")
366             except (UnboundLocalError, Exception) as e:
367                 self._stop(job, "misbehaving. Reason: " + str(e))
368
369     def create(self):
370         """
371         Tries to execute create() on every job.
372         This cannot fail thus it is catching every exception.
373         If job.create() fails job is stopped.
374         This is also creating job run time chart.
375         """
376         i = 0
377         while i < len(self.jobs):
378             job = self.jobs[i]
379             try:
380                 if not job.create():
381                     self._stop(job, "failed create")
382                 else:
383                     chart = job.chart_name
384                     sys.stdout.write(
385                         "CHART netdata.plugin_pythond_" +
386                         chart +
387                         " '' 'Execution time for " +
388                         chart +
389                         " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " +
390                         str(job.timetable['freq']) +
391                         '\n')
392                     sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
393                     # sys.stdout.flush()
394                     i += 1
395             except AttributeError:
396                 self._stop(job, "no create")
397             except (UnboundLocalError, Exception) as e:
398                 self._stop(job, "misbehaving. Reason: " + str(e))
399
400     def _update_job(self, job):
401         """
402         Tries to execute update() on specified job.
403         This cannot fail thus it is catching every exception.
404         If job.update() returns False, number of retries_left is decremented.
405         If there are no more retries, job is stopped.
406         Job is also stopped if it throws an exception.
407         This is also updating job run time chart.
408         Return False if job is stopped
409         :param job: object
410         :return: boolean
411         """
412         t_start = time.time()
413         # check if it is time to execute job update() function
414         if job.timetable['next'] > t_start:
415             return True
416         try:
417             if self.first_run:
418                 since_last = 0
419             else:
420                 since_last = int((t_start - job.timetable['last']) * 1000000)
421             if not job.update(since_last):
422                 if job.retries_left <= 0:
423                     self._stop(job, "update failed")
424                     return False
425                 job.retries_left -= 1
426                 job.timetable['next'] += job.timetable['freq']
427                 return True
428         except AttributeError:
429             self._stop(job, "no update")
430             return False
431         except (UnboundLocalError, Exception) as e:
432             self._stop(job, "misbehaving. Reason: " + str(e))
433             return False
434         t_end = time.time()
435         job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq']
436         # draw performance graph
437         sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.chart_name + " " + str(since_last) + '\n')
438         sys.stdout.write("SET run_time = " + str(int((t_end - t_start) * 1000)) + '\n')
439         sys.stdout.write("END\n")
440         # sys.stdout.flush()
441         job.timetable['last'] = t_start
442         job.retries_left = job.retries
443         self.first_run = False
444         return True
445
446     def update(self):
447         """
448         Tries to execute update() on every job by using _update_job()
449         This will stay forever and ever and ever forever and ever it'll be the one...
450         """
451         self.first_run = True
452         while True:
453             next_runs = []
454             i = 0
455             while i < len(self.jobs):
456                 job = self.jobs[i]
457                 if self._update_job(job):
458                     try:
459                         next_runs.append(job.timetable['next'])
460                     except KeyError:
461                         pass
462                     i += 1
463             if len(next_runs) == 0:
464                 fatal('no python.d modules loaded.')
465             time.sleep(min(next_runs) - time.time())
466
467
468 def read_config(path):
469     """
470     Read YAML configuration from specified file
471     :param path: str
472     :return: dict
473     """
474     try:
475         with open(path, 'r') as stream:
476             config = yaml.load(stream)
477     except (OSError, IOError):
478         error(str(path), "is not a valid configuration file")
479         return None
480     except yaml.YAMLError as e:
481         error(str(path), "is malformed:", e)
482         return None
483     return config
484
485
486 def parse_cmdline(directory, *commands):
487     """
488     Parse parameters from command line.
489     :param directory: str
490     :param commands: list of str
491     :return: dict
492     """
493     global DEBUG_FLAG
494     global UPDATE_EVERY
495     update_every = UPDATE_EVERY
496
497     mods = []
498     for cmd in commands[1:]:
499         if cmd == "check":
500             pass
501         elif cmd == "debug" or cmd == "all":
502             DEBUG_FLAG = True
503             # redirect stderr to stdout?
504         elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
505             DEBUG_FLAG = True
506             mods.append(cmd.replace(".chart.py", ""))
507         else:
508             try:
509                 update_every = int(cmd)
510             except ValueError:
511                 update_every = UPDATE_EVERY
512
513     debug("started from", commands[0], "with options:", *commands[1:])
514
515     UPDATE_EVERY = update_every
516     return {'modules': mods}
517
518
519 # if __name__ == '__main__':
520 def run():
521     """
522     Main program.
523     """
524     global PROGRAM, DEBUG_FLAG
525     PROGRAM = sys.argv[0].split('/')[-1].split('.plugin')[0]
526
527     # read configuration file
528     disabled = []
529     configfile = CONFIG_DIR + "python.d.conf"
530
531     update_every = UPDATE_EVERY
532     conf = read_config(configfile)
533     if conf is not None:
534         try:
535             # FIXME: is this right? exit the whole plugin?
536             if str(conf['enable']) is False:
537                 fatal('disabled in configuration file.\n')
538         except (KeyError, TypeError):
539             pass
540         try:
541             update_every = conf['update_every']
542         except (KeyError, TypeError):
543             pass  # use default update_every from NETDATA_UPDATE_EVERY
544         try:
545             DEBUG_FLAG = conf['debug']
546         except (KeyError, TypeError):
547             pass
548         for k, v in conf.items():
549             if k in ("update_every", "debug", "enable"):
550                 continue
551             if v is False:
552                 disabled.append(k)
553
554     # parse passed command line arguments
555     out = parse_cmdline(MODULES_DIR, *sys.argv)
556     modules = out['modules']
557     info("MODULES_DIR='" + MODULES_DIR + "', CONFIG_DIR='" + CONFIG_DIR + "', UPDATE_EVERY=" + str(UPDATE_EVERY) + ", ONLY_MODULES=" + str(out['modules']))
558
559     # run plugins
560     charts = PythonCharts(update_every, modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
561     charts.check()
562     charts.create()
563     charts.update()
564     fatal("finished")
565
566
567 if __name__ == '__main__':
568     run()