]> arthur.barton.de Git - netdata.git/blob - plugins.d/python.d.plugin
pythin.d.plugin: add nginx_log and gunicorn_log to disabled list
[netdata.git] / plugins.d / python.d.plugin
1 #!/usr/bin/env bash
2 '''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
3 # -*- coding: utf-8 -*-
4
5 # Description: netdata python modules supervisor
6 # Author: Pawel Krupa (paulfantom)
7
8 import os
9 import sys
10 import time
11 import threading
12 from re import sub
13
14 # -----------------------------------------------------------------------------
15 # globals & environment setup
16 # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
17 MODULE_EXTENSION = ".chart.py"
18 BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
19                'priority': 90000,
20                'retries': 10}
21
22 MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
23                                         os.path.dirname(__file__)) + "/../python.d") + "/"
24 CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
25 # directories should end with '/'
26 if CONFIG_DIR[-1] != "/":
27     CONFIG_DIR += "/"
28 sys.path.append(MODULES_DIR + "python_modules")
29
30 PROGRAM = os.path.basename(__file__).replace(".plugin", "")
31 DEBUG_FLAG = False
32 TRACE_FLAG = False
33 OVERRIDE_UPDATE_EVERY = False
34
35 # -----------------------------------------------------------------------------
36 # custom, third party and version specific python modules management
37 import msg
38
39 try:
40     assert sys.version_info >= (3, 1)
41     import importlib.machinery
42     PY_VERSION = 3
43     # change this hack below if we want PY_VERSION to be used in modules
44     # import builtins
45     # builtins.PY_VERSION = 3
46     msg.info('Using python v3')
47 except (AssertionError, ImportError):
48     try:
49         import imp
50
51         # change this hack below if we want PY_VERSION to be used in modules
52         # import __builtin__
53         # __builtin__.PY_VERSION = 2
54         PY_VERSION = 2
55         msg.info('Using python v2')
56     except ImportError:
57         msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
58 # try:
59 #     import yaml
60 # except ImportError:
61 #     msg.fatal('Cannot find yaml library')
62 try:
63     if PY_VERSION == 3:
64         import pyyaml3 as yaml
65     else:
66         import pyyaml2 as yaml
67 except ImportError:
68     msg.fatal('Cannot find yaml library')
69
70 try:
71     from collections import OrderedDict
72     ORDERED = True
73     DICT = OrderedDict
74     msg.info('YAML output is ordered')
75 except ImportError:
76     ORDERED = False
77     DICT = dict
78     msg.info('YAML output is unordered')
79 else:
80     def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
81         class OrderedLoader(Loader):
82             pass
83         def construct_mapping(loader, node):
84            loader.flatten_mapping(node)
85            return object_pairs_hook(loader.construct_pairs(node))
86         OrderedLoader.add_constructor(
87             yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
88             construct_mapping)
89         return yaml.load(stream, OrderedLoader)
90
91 class PythonCharts(object):
92     """
93     Main class used to control every python module.
94     """
95
96     def __init__(self,
97                  modules=None,
98                  modules_path='../python.d/',
99                  modules_configs='../conf.d/',
100                  modules_disabled=None):
101         """
102         :param modules: list
103         :param modules_path: str
104         :param modules_configs: str
105         :param modules_disabled: list
106         """
107
108         if modules is None:
109             modules = []
110         if modules_disabled is None:
111             modules_disabled = []
112
113         self.first_run = True
114         # set configuration directory
115         self.configs = modules_configs
116
117         # load modules
118         loaded_modules = self._load_modules(modules_path, modules, modules_disabled)
119
120         # load configuration files
121         configured_modules = self._load_configs(loaded_modules)
122
123         # good economy and prosperity:
124         self.jobs = self._create_jobs(configured_modules)  # type: list
125
126         # enable timetable override like `python.d.plugin mysql debug 1`
127         if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY:
128             for job in self.jobs:
129                 job.create_timetable(BASE_CONFIG['update_every'])
130
131     @staticmethod
132     def _import_module(path, name=None):
133         """
134         Try to import module using only its path.
135         :param path: str
136         :param name: str
137         :return: object
138         """
139
140         if name is None:
141             name = path.split('/')[-1]
142             if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION:
143                 return None
144             name = name[:-len(MODULE_EXTENSION)]
145         try:
146             if PY_VERSION == 3:
147                 return importlib.machinery.SourceFileLoader(name, path).load_module()
148             else:
149                 return imp.load_source(name, path)
150         except Exception as e:
151             msg.error("Problem loading", name, str(e))
152             return None
153
154     def _load_modules(self, path, modules, disabled):
155         """
156         Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files)
157         :param path: str
158         :param modules: list
159         :param disabled: list
160         :return: list
161         """
162
163         # check if plugin directory exists
164         if not os.path.isdir(path):
165             msg.fatal("cannot find charts directory ", path)
166
167         # load modules
168         loaded = []
169         if len(modules) > 0:
170             for m in modules:
171                 if m in disabled:
172                     continue
173                 mod = self._import_module(path + m + MODULE_EXTENSION)
174                 if mod is not None:
175                     loaded.append(mod)
176                 else:  # exit if plugin is not found
177                     msg.fatal('no modules found.')
178         else:
179             # scan directory specified in path and load all modules from there
180             names = os.listdir(path)
181             for mod in names:
182                 if mod.replace(MODULE_EXTENSION, "") in disabled:
183                     msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
184                     continue
185                 m = self._import_module(path + mod)
186                 if m is not None:
187                     msg.debug(mod + ": loading module '" + path + mod + "'")
188                     loaded.append(m)
189         return loaded
190
191     def _load_configs(self, modules):
192         """
193         Append configuration in list named `config` to every module.
194         For multi-job modules `config` list is created in _parse_config,
195         otherwise it is created here based on BASE_CONFIG prototype with None as identifier.
196         :param modules: list
197         :return: list
198         """
199         for mod in modules:
200             configfile = self.configs + mod.__name__ + ".conf"
201             if os.path.isfile(configfile):
202                 msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
203                 try:
204                     if not hasattr(mod, 'config'):
205                         mod.config = {}
206                     setattr(mod,
207                             'config',
208                             self._parse_config(mod, read_config(configfile)))
209                 except Exception as e:
210                     msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
211             else:
212                 msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
213                 # set config if not found
214                 if not hasattr(mod, 'config'):
215                     msg.debug(mod.__name__ + ": setting configuration for only one job")
216                     mod.config = {None: {}}
217                     for var in BASE_CONFIG:
218                         try:
219                             mod.config[None][var] = getattr(mod, var)
220                         except AttributeError:
221                             mod.config[None][var] = BASE_CONFIG[var]
222         return modules
223
224     @staticmethod
225     def _parse_config(module, config):
226         """
227         Parse configuration file or extract configuration from module file.
228         Example of returned dictionary:
229             config = {'name': {
230                             'update_every': 2,
231                             'retries': 3,
232                             'priority': 30000
233                             'other_val': 123}}
234         :param module: object
235         :param config: dict
236         :return: dict
237         """
238         if config is None:
239             config = {}
240         # get default values
241         defaults = {}
242         msg.debug(module.__name__ + ": reading configuration")
243         for key in BASE_CONFIG:
244             try:
245                 # get defaults from module config
246                 defaults[key] = int(config.pop(key))
247             except (KeyError, ValueError):
248                 try:
249                     # get defaults from module source code
250                     defaults[key] = getattr(module, key)
251                 except (KeyError, ValueError, AttributeError):
252                     # if above failed, get defaults from global dict
253                     defaults[key] = BASE_CONFIG[key]
254
255         # check if there are dict in config dict
256         many_jobs = False
257         for name in config:
258             if isinstance(config[name], DICT):
259                 many_jobs = True
260                 break
261
262         # assign variables needed by supervisor to every job configuration
263         if many_jobs:
264             for name in config:
265                 for key in defaults:
266                     if key not in config[name]:
267                         config[name][key] = defaults[key]
268         # if only one job is needed, values doesn't have to be in dict (in YAML)
269         else:
270             config = {None: config.copy()}
271             config[None].update(defaults)
272
273         # return dictionary of jobs where every job has BASE_CONFIG variables
274         return config
275
276     @staticmethod
277     def _create_jobs(modules):
278         """
279         Create jobs based on module.config dictionary and module.Service class definition.
280         :param modules: list
281         :return: list
282         """
283         jobs = []
284         for module in modules:
285             for name in module.config:
286                 # register a new job
287                 conf = module.config[name]
288                 try:
289                     job = module.Service(configuration=conf, name=name)
290                 except Exception as e:
291                     msg.error(module.__name__ +
292                               ("/" + str(name) if name is not None else "") +
293                               ": cannot start job: '" +
294                               str(e))
295                     return None
296                 else:
297                     # set chart_name (needed to plot run time graphs)
298                     job.chart_name = module.__name__
299                     if name is not None:
300                         job.chart_name += "_" + name
301                 jobs.append(job)
302                 msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
303
304         return [j for j in jobs if j is not None]
305
306     def _stop(self, job, reason=None):
307         """
308         Stop specified job and remove it from self.jobs list
309         Also notifies user about job failure if DEBUG_FLAG is set
310         :param job: object
311         :param reason: str
312         """
313         prefix = job.__module__
314         if job.name is not None and len(job.name) != 0:
315             prefix += "/" + job.name
316         try:
317             msg.error("DISABLED:", prefix)
318             self.jobs.remove(job)
319         except Exception as e:
320             msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e))
321
322         # TODO remove section below and remove `reason`.
323         prefix += ": "
324         if reason is None:
325             return
326         elif reason[:3] == "no ":
327             msg.error(prefix +
328                       "does not seem to have " +
329                       reason[3:] +
330                       "() function. Disabling it.")
331         elif reason[:7] == "failed ":
332             msg.error(prefix +
333                       reason[7:] +
334                       "() function reports failure.")
335         elif reason[:13] == "configuration":
336             msg.error(prefix +
337                       "configuration file '" +
338                       self.configs +
339                       job.__module__ +
340                       ".conf' not found. Using defaults.")
341         elif reason[:11] == "misbehaving":
342             msg.error(prefix + "is " + reason)
343
344     def check(self):
345         """
346         Tries to execute check() on every job.
347         This cannot fail thus it is catching every exception
348         If job.check() fails job is stopped
349         """
350         i = 0
351         overridden = []
352         msg.debug("all job objects", str(self.jobs))
353         while i < len(self.jobs):
354             job = self.jobs[i]
355             try:
356                 if not job.check():
357                     msg.error(job.chart_name, "check() failed - disabling job")
358                     self._stop(job)
359                 else:
360                     msg.info("CHECKED OK:", job.chart_name)
361                     i += 1
362                     try:
363                         if job.override_name is not None:
364                             new_name = job.__module__ + '_' + sub(r'\s+', '_', job.override_name)
365                             if new_name in overridden:
366                                 msg.info("DROPPED:", job.name, ", job '" + job.override_name + "' is already served by another job.")
367                                 self._stop(job)
368                                 i -= 1
369                             else:
370                                 job.name = job.override_name
371                                 msg.info("RENAMED:", new_name, ", from " + job.chart_name)
372                                 job.chart_name = new_name
373                                 overridden.append(job.chart_name)
374                     except Exception:
375                         pass
376             except AttributeError as e:
377                 self._stop(job)
378                 msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.")
379                 msg.debug(str(e))
380             except (UnboundLocalError, Exception) as e:
381                 msg.error(job.chart_name, str(e))
382                 self._stop(job)
383         msg.debug("overridden job names:", str(overridden))
384         msg.debug("all remaining job objects:", str(self.jobs))
385
386     def create(self):
387         """
388         Tries to execute create() on every job.
389         This cannot fail thus it is catching every exception.
390         If job.create() fails job is stopped.
391         This is also creating job run time chart.
392         """
393         i = 0
394         while i < len(self.jobs):
395             job = self.jobs[i]
396             try:
397                 if not job.create():
398                     msg.error(job.chart_name, "create function failed.")
399                     self._stop(job)
400                 else:
401                     chart = job.chart_name
402                     sys.stdout.write(
403                         "CHART netdata.plugin_pythond_" +
404                         chart +
405                         " '' 'Execution time for " +
406                         chart +
407                         " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " +
408                         str(job.timetable['freq']) +
409                         '\n')
410                     sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
411                     msg.debug("created charts for", job.chart_name)
412                     # sys.stdout.flush()
413                     i += 1
414             except AttributeError:
415                 msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.")
416                 self._stop(job)
417             except (UnboundLocalError, Exception) as e:
418                 msg.error(job.chart_name, str(e))
419                 self._stop(job)
420
421     def update(self):
422         """
423         Creates and supervises every job thread.
424         This will stay forever and ever and ever forever and ever it'll be the one...
425         """
426         for job in self.jobs:
427             job.start()
428
429         while True:
430             if threading.active_count() <= 1:
431                 msg.fatal("no more jobs")
432             time.sleep(1)
433
434
435 def read_config(path):
436     """
437     Read YAML configuration from specified file
438     :param path: str
439     :return: dict
440     """
441     try:
442         with open(path, 'r') as stream:
443             if ORDERED:
444                 config = ordered_load(stream, yaml.SafeLoader)
445             else:
446                 config = yaml.load(stream)
447     except (OSError, IOError):
448         msg.error(str(path), "is not a valid configuration file")
449         return None
450     except yaml.YAMLError as e:
451         msg.error(str(path), "is malformed:", e)
452         return None
453     return config
454
455
456 def parse_cmdline(directory, *commands):
457     """
458     Parse parameters from command line.
459     :param directory: str
460     :param commands: list of str
461     :return: dict
462     """
463     global DEBUG_FLAG, TRACE_FLAG
464     global OVERRIDE_UPDATE_EVERY
465     global BASE_CONFIG
466
467     changed_update = False
468     mods = []
469     for cmd in commands[1:]:
470         if cmd == "check":
471             pass
472         elif cmd == "debug" or cmd == "all":
473             DEBUG_FLAG = True
474             # redirect stderr to stdout?
475         elif cmd == "trace" or cmd == "all":
476             TRACE_FLAG = True
477         elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
478             #DEBUG_FLAG = True
479             mods.append(cmd.replace(".chart.py", ""))
480         else:
481             try:
482                 BASE_CONFIG['update_every'] = int(cmd)
483                 changed_update = True
484             except ValueError:
485                 pass
486     if changed_update and DEBUG_FLAG:
487         OVERRIDE_UPDATE_EVERY = True
488         msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
489
490     msg.debug("started from", commands[0], "with options:", *commands[1:])
491
492     return mods
493
494
495 # if __name__ == '__main__':
496 def run():
497     """
498     Main program.
499     """
500     global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG
501
502     # read configuration file
503     disabled = ['nginx_log', 'gunicorn_log']
504     configfile = CONFIG_DIR + "python.d.conf"
505     msg.PROGRAM = PROGRAM
506     msg.info("reading configuration file:", configfile)
507     log_throttle = 200
508     log_interval = 3600
509
510     conf = read_config(configfile)
511     if conf is not None:
512         try:
513             # exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
514             if conf['enabled'] is False:
515                 msg.fatal('disabled in configuration file.\n')
516         except (KeyError, TypeError):
517             pass
518
519         try:
520             for param in BASE_CONFIG:
521                 BASE_CONFIG[param] = conf[param]
522         except (KeyError, TypeError):
523             pass  # use default update_every from NETDATA_UPDATE_EVERY
524
525         try:
526             DEBUG_FLAG = conf['debug']
527         except (KeyError, TypeError):
528             pass
529
530         try:
531             TRACE_FLAG = conf['trace']
532         except (KeyError, TypeError):
533             pass
534
535         try:
536             log_throttle = conf['logs_per_interval']
537         except (KeyError, TypeError):
538             pass
539
540         try:
541             log_interval = conf['log_interval']
542         except (KeyError, TypeError):
543             pass
544
545         for k, v in conf.items():
546             if k in ("update_every", "debug", "enabled"):
547                 continue
548             if v is False:
549                 disabled.append(k)
550
551     # parse passed command line arguments
552     modules = parse_cmdline(MODULES_DIR, *sys.argv)
553     msg.DEBUG_FLAG = DEBUG_FLAG
554     msg.TRACE_FLAG = TRACE_FLAG
555     msg.LOG_THROTTLE = log_throttle
556     msg.LOG_INTERVAL = log_interval
557     msg.LOG_COUNTER = 0
558     msg.LOG_NEXT_CHECK = 0
559     msg.info("MODULES_DIR='" + MODULES_DIR +
560              "', CONFIG_DIR='" + CONFIG_DIR +
561              "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
562              ", ONLY_MODULES=" + str(modules))
563
564     # run plugins
565     charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
566     charts.check()
567     charts.create()
568     charts.update()
569     msg.fatal("finished")
570
571
572 if __name__ == '__main__':
573     run()