]> arthur.barton.de Git - netdata.git/blob - plugins.d/python.d.plugin
Merge pull request #1373 from jimcooley/master
[netdata.git] / plugins.d / python.d.plugin
1 #!/usr/bin/env bash
2 '''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
3 # -*- coding: utf-8 -*-
4
5 # Description: netdata python modules supervisor
6 # Author: Pawel Krupa (paulfantom)
7
8 import os
9 import sys
10 import time
11 import threading
12
13 # -----------------------------------------------------------------------------
14 # globals & environment setup
15 # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
16 MODULE_EXTENSION = ".chart.py"
17 BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
18                'priority': 90000,
19                'retries': 10}
20
21 MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
22                                         os.path.dirname(__file__)) + "/../python.d") + "/"
23 CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
24 # directories should end with '/'
25 if CONFIG_DIR[-1] != "/":
26     CONFIG_DIR += "/"
27 sys.path.append(MODULES_DIR + "python_modules")
28
29 PROGRAM = os.path.basename(__file__).replace(".plugin", "")
30 DEBUG_FLAG = False
31 TRACE_FLAG = False
32 OVERRIDE_UPDATE_EVERY = False
33
34 # -----------------------------------------------------------------------------
35 # custom, third party and version specific python modules management
36 import msg
37
38 try:
39     assert sys.version_info >= (3, 1)
40     import importlib.machinery
41     PY_VERSION = 3
42     # change this hack below if we want PY_VERSION to be used in modules
43     # import builtins
44     # builtins.PY_VERSION = 3
45     msg.info('Using python v3')
46 except (AssertionError, ImportError):
47     try:
48         import imp
49
50         # change this hack below if we want PY_VERSION to be used in modules
51         # import __builtin__
52         # __builtin__.PY_VERSION = 2
53         PY_VERSION = 2
54         msg.info('Using python v2')
55     except ImportError:
56         msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
57 # try:
58 #     import yaml
59 # except ImportError:
60 #     msg.fatal('Cannot find yaml library')
61 try:
62     if PY_VERSION == 3:
63         import pyyaml3 as yaml
64     else:
65         import pyyaml2 as yaml
66 except ImportError:
67     msg.fatal('Cannot find yaml library')
68
69
70 class PythonCharts(object):
71     """
72     Main class used to control every python module.
73     """
74
75     def __init__(self,
76                  modules=None,
77                  modules_path='../python.d/',
78                  modules_configs='../conf.d/',
79                  modules_disabled=None):
80         """
81         :param modules: list
82         :param modules_path: str
83         :param modules_configs: str
84         :param modules_disabled: list
85         """
86
87         if modules is None:
88             modules = []
89         if modules_disabled is None:
90             modules_disabled = []
91
92         self.first_run = True
93         # set configuration directory
94         self.configs = modules_configs
95
96         # load modules
97         loaded_modules = self._load_modules(modules_path, modules, modules_disabled)
98
99         # load configuration files
100         configured_modules = self._load_configs(loaded_modules)
101
102         # good economy and prosperity:
103         self.jobs = self._create_jobs(configured_modules)  # type: list
104
105         # enable timetable override like `python.d.plugin mysql debug 1`
106         if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY:
107             for job in self.jobs:
108                 job.create_timetable(BASE_CONFIG['update_every'])
109
110     @staticmethod
111     def _import_module(path, name=None):
112         """
113         Try to import module using only its path.
114         :param path: str
115         :param name: str
116         :return: object
117         """
118
119         if name is None:
120             name = path.split('/')[-1]
121             if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION:
122                 return None
123             name = name[:-len(MODULE_EXTENSION)]
124         try:
125             if PY_VERSION == 3:
126                 return importlib.machinery.SourceFileLoader(name, path).load_module()
127             else:
128                 return imp.load_source(name, path)
129         except Exception as e:
130             msg.error("Problem loading", name, str(e))
131             return None
132
133     def _load_modules(self, path, modules, disabled):
134         """
135         Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files)
136         :param path: str
137         :param modules: list
138         :param disabled: list
139         :return: list
140         """
141
142         # check if plugin directory exists
143         if not os.path.isdir(path):
144             msg.fatal("cannot find charts directory ", path)
145
146         # load modules
147         loaded = []
148         if len(modules) > 0:
149             for m in modules:
150                 if m in disabled:
151                     continue
152                 mod = self._import_module(path + m + MODULE_EXTENSION)
153                 if mod is not None:
154                     loaded.append(mod)
155                 else:  # exit if plugin is not found
156                     msg.fatal('no modules found.')
157         else:
158             # scan directory specified in path and load all modules from there
159             names = os.listdir(path)
160             for mod in names:
161                 if mod.replace(MODULE_EXTENSION, "") in disabled:
162                     msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
163                     continue
164                 m = self._import_module(path + mod)
165                 if m is not None:
166                     msg.debug(mod + ": loading module '" + path + mod + "'")
167                     loaded.append(m)
168         return loaded
169
170     def _load_configs(self, modules):
171         """
172         Append configuration in list named `config` to every module.
173         For multi-job modules `config` list is created in _parse_config,
174         otherwise it is created here based on BASE_CONFIG prototype with None as identifier.
175         :param modules: list
176         :return: list
177         """
178         for mod in modules:
179             configfile = self.configs + mod.__name__ + ".conf"
180             if os.path.isfile(configfile):
181                 msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
182                 try:
183                     if not hasattr(mod, 'config'):
184                         mod.config = {}
185                     setattr(mod,
186                             'config',
187                             self._parse_config(mod, read_config(configfile)))
188                 except Exception as e:
189                     msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
190             else:
191                 msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
192                 # set config if not found
193                 if not hasattr(mod, 'config'):
194                     msg.debug(mod.__name__ + ": setting configuration for only one job")
195                     mod.config = {None: {}}
196                     for var in BASE_CONFIG:
197                         try:
198                             mod.config[None][var] = getattr(mod, var)
199                         except AttributeError:
200                             mod.config[None][var] = BASE_CONFIG[var]
201         return modules
202
203     @staticmethod
204     def _parse_config(module, config):
205         """
206         Parse configuration file or extract configuration from module file.
207         Example of returned dictionary:
208             config = {'name': {
209                             'update_every': 2,
210                             'retries': 3,
211                             'priority': 30000
212                             'other_val': 123}}
213         :param module: object
214         :param config: dict
215         :return: dict
216         """
217         if config is None:
218             config = {}
219         # get default values
220         defaults = {}
221         msg.debug(module.__name__ + ": reading configuration")
222         for key in BASE_CONFIG:
223             try:
224                 # get defaults from module config
225                 defaults[key] = int(config.pop(key))
226             except (KeyError, ValueError):
227                 try:
228                     # get defaults from module source code
229                     defaults[key] = getattr(module, key)
230                 except (KeyError, ValueError, AttributeError):
231                     # if above failed, get defaults from global dict
232                     defaults[key] = BASE_CONFIG[key]
233
234         # check if there are dict in config dict
235         many_jobs = False
236         for name in config:
237             if type(config[name]) is dict:
238                 many_jobs = True
239                 break
240
241         # assign variables needed by supervisor to every job configuration
242         if many_jobs:
243             for name in config:
244                 for key in defaults:
245                     if key not in config[name]:
246                         config[name][key] = defaults[key]
247         # if only one job is needed, values doesn't have to be in dict (in YAML)
248         else:
249             config = {None: config.copy()}
250             config[None].update(defaults)
251
252         # return dictionary of jobs where every job has BASE_CONFIG variables
253         return config
254
255     @staticmethod
256     def _create_jobs(modules):
257         """
258         Create jobs based on module.config dictionary and module.Service class definition.
259         :param modules: list
260         :return: list
261         """
262         jobs = []
263         for module in modules:
264             for name in module.config:
265                 # register a new job
266                 conf = module.config[name]
267                 try:
268                     job = module.Service(configuration=conf, name=name)
269                 except Exception as e:
270                     msg.error(module.__name__ +
271                               ("/" + str(name) if name is not None else "") +
272                               ": cannot start job: '" +
273                               str(e))
274                     return None
275                 else:
276                     # set chart_name (needed to plot run time graphs)
277                     job.chart_name = module.__name__
278                     if name is not None:
279                         job.chart_name += "_" + name
280                 jobs.append(job)
281                 msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
282
283         return [j for j in jobs if j is not None]
284
285     def _stop(self, job, reason=None):
286         """
287         Stop specified job and remove it from self.jobs list
288         Also notifies user about job failure if DEBUG_FLAG is set
289         :param job: object
290         :param reason: str
291         """
292         prefix = job.__module__
293         if job.name is not None and len(job.name) != 0:
294             prefix += "/" + job.name
295         try:
296             msg.error("DISABLED:", prefix)
297             self.jobs.remove(job)
298         except Exception as e:
299             msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e))
300
301         # TODO remove section below and remove `reason`.
302         prefix += ": "
303         if reason is None:
304             return
305         elif reason[:3] == "no ":
306             msg.error(prefix +
307                       "does not seem to have " +
308                       reason[3:] +
309                       "() function. Disabling it.")
310         elif reason[:7] == "failed ":
311             msg.error(prefix +
312                       reason[7:] +
313                       "() function reports failure.")
314         elif reason[:13] == "configuration":
315             msg.error(prefix +
316                       "configuration file '" +
317                       self.configs +
318                       job.__module__ +
319                       ".conf' not found. Using defaults.")
320         elif reason[:11] == "misbehaving":
321             msg.error(prefix + "is " + reason)
322
323     def check(self):
324         """
325         Tries to execute check() on every job.
326         This cannot fail thus it is catching every exception
327         If job.check() fails job is stopped
328         """
329         i = 0
330         overridden = []
331         msg.debug("all job objects", str(self.jobs))
332         while i < len(self.jobs):
333             job = self.jobs[i]
334             try:
335                 if not job.check():
336                     msg.error(job.chart_name, "check() failed - disabling job")
337                     self._stop(job)
338                 else:
339                     msg.info("CHECKED OK:", job.chart_name)
340                     i += 1
341                     try:
342                         if job.override_name is not None:
343                             new_name = job.__module__ + '_' + job.override_name
344                             if new_name in overridden:
345                                 msg.info("DROPPED:", job.name, ", job '" + job.override_name + "' is already served by another job.")
346                                 self._stop(job)
347                                 i -= 1
348                             else:
349                                 job.name = job.override_name
350                                 msg.info("RENAMED:", new_name, ", from " + job.chart_name)
351                                 job.chart_name = new_name
352                                 overridden.append(job.chart_name)
353                     except Exception:
354                         pass
355             except AttributeError as e:
356                 self._stop(job)
357                 msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.")
358                 msg.debug(str(e))
359             except (UnboundLocalError, Exception) as e:
360                 msg.error(job.chart_name, str(e))
361                 self._stop(job)
362         msg.debug("overridden job names:", str(overridden))
363         msg.debug("all remaining job objects:", str(self.jobs))
364
365     def create(self):
366         """
367         Tries to execute create() on every job.
368         This cannot fail thus it is catching every exception.
369         If job.create() fails job is stopped.
370         This is also creating job run time chart.
371         """
372         i = 0
373         while i < len(self.jobs):
374             job = self.jobs[i]
375             try:
376                 if not job.create():
377                     msg.error(job.chart_name, "create function failed.")
378                     self._stop(job)
379                 else:
380                     chart = job.chart_name
381                     sys.stdout.write(
382                         "CHART netdata.plugin_pythond_" +
383                         chart +
384                         " '' 'Execution time for " +
385                         chart +
386                         " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " +
387                         str(job.timetable['freq']) +
388                         '\n')
389                     sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
390                     msg.debug("created charts for", job.chart_name)
391                     # sys.stdout.flush()
392                     i += 1
393             except AttributeError:
394                 msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.")
395                 self._stop(job)
396             except (UnboundLocalError, Exception) as e:
397                 msg.error(job.chart_name, str(e))
398                 self._stop(job)
399
400     def update(self):
401         """
402         Creates and supervises every job thread.
403         This will stay forever and ever and ever forever and ever it'll be the one...
404         """
405         for job in self.jobs:
406             job.start()
407
408         while True:
409             if threading.active_count() <= 1:
410                 msg.fatal("no more jobs")
411             time.sleep(1)
412
413
414 def read_config(path):
415     """
416     Read YAML configuration from specified file
417     :param path: str
418     :return: dict
419     """
420     try:
421         with open(path, 'r') as stream:
422             config = yaml.load(stream)
423     except (OSError, IOError):
424         msg.error(str(path), "is not a valid configuration file")
425         return None
426     except yaml.YAMLError as e:
427         msg.error(str(path), "is malformed:", e)
428         return None
429     return config
430
431
432 def parse_cmdline(directory, *commands):
433     """
434     Parse parameters from command line.
435     :param directory: str
436     :param commands: list of str
437     :return: dict
438     """
439     global DEBUG_FLAG, TRACE_FLAG
440     global OVERRIDE_UPDATE_EVERY
441     global BASE_CONFIG
442
443     changed_update = False
444     mods = []
445     for cmd in commands[1:]:
446         if cmd == "check":
447             pass
448         elif cmd == "debug" or cmd == "all":
449             DEBUG_FLAG = True
450             # redirect stderr to stdout?
451         elif cmd == "trace" or cmd == "all":
452             TRACE_FLAG = True
453         elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
454             #DEBUG_FLAG = True
455             mods.append(cmd.replace(".chart.py", ""))
456         else:
457             try:
458                 BASE_CONFIG['update_every'] = int(cmd)
459                 changed_update = True
460             except ValueError:
461                 pass
462     if changed_update and DEBUG_FLAG:
463         OVERRIDE_UPDATE_EVERY = True
464         msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
465
466     msg.debug("started from", commands[0], "with options:", *commands[1:])
467
468     return mods
469
470
471 # if __name__ == '__main__':
472 def run():
473     """
474     Main program.
475     """
476     global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG
477
478     # read configuration file
479     disabled = []
480     configfile = CONFIG_DIR + "python.d.conf"
481     msg.PROGRAM = PROGRAM
482     msg.info("reading configuration file:", configfile)
483     log_throttle = 200
484     log_interval = 3600
485
486     conf = read_config(configfile)
487     if conf is not None:
488         try:
489             # exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
490             if conf['enabled'] is False:
491                 msg.fatal('disabled in configuration file.\n')
492         except (KeyError, TypeError):
493             pass
494
495         try:
496             for param in BASE_CONFIG:
497                 BASE_CONFIG[param] = conf[param]
498         except (KeyError, TypeError):
499             pass  # use default update_every from NETDATA_UPDATE_EVERY
500
501         try:
502             DEBUG_FLAG = conf['debug']
503         except (KeyError, TypeError):
504             pass
505
506         try:
507             TRACE_FLAG = conf['trace']
508         except (KeyError, TypeError):
509             pass
510
511         try:
512             log_throttle = conf['logs_per_interval']
513         except (KeyError, TypeError):
514             pass
515
516         try:
517             log_interval = conf['log_interval']
518         except (KeyError, TypeError):
519             pass
520
521         for k, v in conf.items():
522             if k in ("update_every", "debug", "enabled"):
523                 continue
524             if v is False:
525                 disabled.append(k)
526
527     # parse passed command line arguments
528     modules = parse_cmdline(MODULES_DIR, *sys.argv)
529     msg.DEBUG_FLAG = DEBUG_FLAG
530     msg.TRACE_FLAG = TRACE_FLAG
531     msg.LOG_THROTTLE = log_throttle
532     msg.LOG_INTERVAL = log_interval
533     msg.LOG_COUNTER = 0
534     msg.LOG_NEXT_CHECK = 0
535     msg.info("MODULES_DIR='" + MODULES_DIR +
536              "', CONFIG_DIR='" + CONFIG_DIR +
537              "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
538              ", ONLY_MODULES=" + str(modules))
539
540     # run plugins
541     charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
542     charts.check()
543     charts.create()
544     charts.update()
545     msg.fatal("finished")
546
547
548 if __name__ == '__main__':
549     run()