]> arthur.barton.de Git - netdata.git/blob - plugins.d/python.d.plugin
Merge pull request #600 from paulfantom/master
[netdata.git] / plugins.d / python.d.plugin
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # Description: netdata python modules supervisor
5 # Author: Pawel Krupa (paulfantom)
6
7 import os
8 import sys
9 import time
10
11 # -----------------------------------------------------------------------------
12 # globals & environment setup
13 # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
14 MODULE_EXTENSION = ".chart.py"
15 BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
16                'priority': 90000,
17                'retries': 10}
18
19 MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
20                               os.path.dirname(__file__)) + "/../python.d") + "/"
21 CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
22 # directories should end with '/'
23 if CONFIG_DIR[-1] != "/":
24     CONFIG_DIR += "/"
25 sys.path.append(MODULES_DIR + "python_modules")
26
27 PROGRAM = os.path.basename(__file__).replace(".plugin", "")
28 DEBUG_FLAG = False
29 OVERRIDE_UPDATE_EVERY = False
30
31
32 # -----------------------------------------------------------------------------
33 # logging functions
34 def log_msg(msg_type, *args):
35     """
36     Print message on stderr.
37     :param msg_type: str
38     """
39     sys.stderr.write(PROGRAM)
40     sys.stderr.write(" ")
41     sys.stderr.write(msg_type)
42     sys.stderr.write(": ")
43     for i in args:
44         sys.stderr.write(" ")
45         sys.stderr.write(str(i))
46     sys.stderr.write("\n")
47     sys.stderr.flush()
48
49
50 def debug(*args):
51     """
52     Print debug message on stderr.
53     """
54     if not DEBUG_FLAG:
55         return
56
57     log_msg("DEBUG", *args)
58
59
60 def error(*args):
61     """
62     Print message on stderr.
63     """
64     log_msg("ERROR", *args)
65
66
67 def info(*args):
68     """
69     Print message on stderr.
70     """
71     log_msg("INFO", *args)
72
73
74 def fatal(*args):
75     """
76     Print message on stderr and exit.
77     """
78     log_msg("FATAL", *args)
79     sys.stdout.write('DISABLE\n')
80     sys.exit(1)
81
82
83 # -----------------------------------------------------------------------------
84 # third party and version specific python modules management
85 try:
86     assert sys.version_info >= (3, 1)
87     import importlib.machinery
88
89     # change this hack below if we want PY_VERSION to be used in modules
90     # import builtins
91     # builtins.PY_VERSION = 3
92     PY_VERSION = 3
93     info('Using python v3')
94 except (AssertionError, ImportError):
95     try:
96         import imp
97
98         # change this hack below if we want PY_VERSION to be used in modules
99         # import __builtin__
100         # __builtin__.PY_VERSION = 2
101         PY_VERSION = 2
102         info('Using python v2')
103     except ImportError:
104         fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
105 try:
106     import yaml
107 except ImportError:
108     fatal('Cannot find yaml library')
109
110
111 class PythonCharts(object):
112     """
113     Main class used to control every python module.
114     """
115     def __init__(self,
116                  modules=None,
117                  modules_path='../python.d/',
118                  modules_configs='../conf.d/',
119                  modules_disabled=None):
120         """
121         :param modules: list
122         :param modules_path: str
123         :param modules_configs: str
124         :param modules_disabled: list
125         """
126
127         if modules is None:
128             modules = []
129         if modules_disabled is None:
130             modules_disabled = []
131
132         self.first_run = True
133         # set configuration directory
134         self.configs = modules_configs
135
136         # load modules
137         loaded_modules = self._load_modules(modules_path, modules, modules_disabled)
138
139         # load configuration files
140         configured_modules = self._load_configs(loaded_modules)
141
142         # good economy and prosperity:
143         self.jobs = self._create_jobs(configured_modules)  # type: list
144
145         # enable timetable override like `python.d.plugin mysql debug 1`
146         if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY:
147             for job in self.jobs:
148                 job.create_timetable(BASE_CONFIG['update_every'])
149
150     @staticmethod
151     def _import_module(path, name=None):
152         """
153         Try to import module using only its path.
154         :param path: str
155         :param name: str
156         :return: object
157         """
158
159         if name is None:
160             name = path.split('/')[-1]
161             if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION:
162                 return None
163             name = name[:-len(MODULE_EXTENSION)]
164         try:
165             if PY_VERSION == 3:
166                 return importlib.machinery.SourceFileLoader(name, path).load_module()
167             else:
168                 return imp.load_source(name, path)
169         except Exception as e:
170             error("Problem loading", name, str(e))
171             return None
172
173     def _load_modules(self, path, modules, disabled):
174         """
175         Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files)
176         :param path: str
177         :param modules: list
178         :param disabled: list
179         :return: list
180         """
181
182         # check if plugin directory exists
183         if not os.path.isdir(path):
184             fatal("cannot find charts directory ", path)
185
186         # load modules
187         loaded = []
188         if len(modules) > 0:
189             for m in modules:
190                 if m in disabled:
191                     continue
192                 mod = self._import_module(path + m + MODULE_EXTENSION)
193                 if mod is not None:
194                     loaded.append(mod)
195                 else:  # exit if plugin is not found
196                     fatal('no modules found.')
197         else:
198             # scan directory specified in path and load all modules from there
199             names = os.listdir(path)
200             for mod in names:
201                 if mod.replace(MODULE_EXTENSION, "") in disabled:
202                     error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
203                     continue
204                 m = self._import_module(path + mod)
205                 if m is not None:
206                     debug(mod + ": loading module '" + path + mod + "'")
207                     loaded.append(m)
208         return loaded
209
210     def _load_configs(self, modules):
211         """
212         Append configuration in list named `config` to every module.
213         For multi-job modules `config` list is created in _parse_config,
214         otherwise it is created here based on BASE_CONFIG prototype with None as identifier.
215         :param modules: list
216         :return: list
217         """
218         for mod in modules:
219             configfile = self.configs + mod.__name__ + ".conf"
220             if os.path.isfile(configfile):
221                 debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
222                 try:
223                     if not hasattr(mod, 'config'):
224                         mod.config = {}
225                     setattr(mod,
226                             'config',
227                             self._parse_config(mod, read_config(configfile)))
228                 except Exception as e:
229                     error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
230             else:
231                 error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
232                 # set config if not found
233                 if not hasattr(mod, 'config'):
234                     debug(mod.__name__ + ": setting configuration for only one job")
235                     mod.config = {None: {}}
236                     for var in BASE_CONFIG:
237                         try:
238                             mod.config[None][var] = getattr(mod, var)
239                         except AttributeError:
240                             mod.config[None][var] = BASE_CONFIG[var]
241         return modules
242
243     @staticmethod
244     def _parse_config(module, config):
245         """
246         Parse configuration file or extract configuration from module file.
247         Example of returned dictionary:
248             config = {'name': {
249                             'update_every': 2,
250                             'retries': 3,
251                             'priority': 30000
252                             'other_val': 123}}
253         :param module: object
254         :param config: dict
255         :return: dict
256         """
257         # get default values
258         defaults = {}
259         debug(module.__name__ + ": reading configuration")
260         for key in BASE_CONFIG:
261             try:
262                 # get defaults from module config
263                 defaults[key] = int(config.pop(key))
264             except (KeyError, ValueError):
265                 try:
266                     # get defaults from module source code
267                     defaults[key] = getattr(module, key)
268                 except (KeyError, ValueError, AttributeError):
269                     # if above failed, get defaults from global dict
270                     defaults[key] = BASE_CONFIG[key]
271
272         # check if there are dict in config dict
273         many_jobs = False
274         for name in config:
275             if type(config[name]) is dict:
276                 many_jobs = True
277                 break
278
279         # assign variables needed by supervisor to every job configuration
280         if many_jobs:
281             for name in config:
282                 for key in defaults:
283                     if key not in config[name]:
284                         config[name][key] = defaults[key]
285         # if only one job is needed, values doesn't have to be in dict (in YAML)
286         else:
287             config = {None: config.copy()}
288             config[None].update(defaults)
289
290         # return dictionary of jobs where every job has BASE_CONFIG variables
291         return config
292
293     @staticmethod
294     def _create_jobs(modules):
295         """
296         Create jobs based on module.config dictionary and module.Service class definition.
297         :param modules: list
298         :return: list
299         """
300         jobs = []
301         for module in modules:
302             for name in module.config:
303                 # register a new job
304                 conf = module.config[name]
305                 try:
306                     job = module.Service(configuration=conf, name=name)
307                 except Exception as e:
308                     error(module.__name__ +
309                           ("/" + str(name) if name is not None else "") +
310                           ": cannot start job: '" +
311                           str(e))
312                     return None
313                 else:
314                     # set chart_name (needed to plot run time graphs)
315                     job.chart_name = module.__name__
316                     if name is not None:
317                         job.chart_name += "_" + name
318                 jobs.append(job)
319                 debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
320
321         return [j for j in jobs if j is not None]
322
323     def _stop(self, job, reason=None):
324         """
325         Stop specified job and remove it from self.jobs list
326         Also notifies user about job failure if DEBUG_FLAG is set
327         :param job: object
328         :param reason: str
329         """
330         prefix = job.__module__
331         if job.name is not None:
332             prefix += "/" + job.name
333         prefix += ": "
334
335         self.jobs.remove(job)
336         if reason is None:
337             return
338         elif reason[:3] == "no ":
339             error(prefix +
340                   "does not seem to have " +
341                   reason[3:] +
342                   "() function. Disabling it.")
343         elif reason[:7] == "failed ":
344             error(prefix +
345                   reason[7:] +
346                   "() function reports failure.")
347         elif reason[:13] == "configuration":
348             error(prefix +
349                   "configuration file '" +
350                   self.configs +
351                   job.__module__ +
352                   ".conf' not found. Using defaults.")
353         elif reason[:11] == "misbehaving":
354             error(prefix + "is " + reason)
355
356     def check(self):
357         """
358         Tries to execute check() on every job.
359         This cannot fail thus it is catching every exception
360         If job.check() fails job is stopped
361         """
362         i = 0
363         while i < len(self.jobs):
364             job = self.jobs[i]
365             try:
366                 if not job.check():
367                     self._stop(job, "failed check")
368                 else:
369                     debug(job.chart_name, ": check succeeded")
370                     i += 1
371             except AttributeError:
372                 self._stop(job, "no check")
373             except (UnboundLocalError, Exception) as e:
374                 self._stop(job, "misbehaving. Reason:" + str(e))
375
376     def create(self):
377         """
378         Tries to execute create() on every job.
379         This cannot fail thus it is catching every exception.
380         If job.create() fails job is stopped.
381         This is also creating job run time chart.
382         """
383         i = 0
384         while i < len(self.jobs):
385             job = self.jobs[i]
386             try:
387                 if not job.create():
388                     self._stop(job, "failed create")
389                 else:
390                     chart = job.chart_name
391                     sys.stdout.write(
392                         "CHART netdata.plugin_pythond_" +
393                         chart +
394                         " '' 'Execution time for " +
395                         chart +
396                         " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " +
397                         str(job.timetable['freq']) +
398                         '\n')
399                     sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
400                     debug("created charts for", job.chart_name)
401                     # sys.stdout.flush()
402                     i += 1
403             except AttributeError:
404                 self._stop(job, "no create")
405             except (UnboundLocalError, Exception) as e:
406                 self._stop(job, "misbehaving. Reason: " + str(e))
407
408     def _update_job(self, job):
409         """
410         Tries to execute update() on specified job.
411         This cannot fail thus it is catching every exception.
412         If job.update() returns False, number of retries_left is decremented.
413         If there are no more retries, job is stopped.
414         Job is also stopped if it throws an exception.
415         This is also updating job run time chart.
416         Return False if job is stopped
417         :param job: object
418         :return: boolean
419         """
420         t_start = time.time()
421         # check if it is time to execute job update() function
422         if job.timetable['next'] > t_start:
423             debug(job.chart_name + " will be run in " + str(int((job.timetable['next'] - t_start) * 1000)) + " ms")
424             return True
425         try:
426             if self.first_run:
427                 since_last = 0
428             else:
429                 since_last = int((t_start - job.timetable['last']) * 1000000)
430                 debug(job.chart_name +
431                       " ready to run, after " + str(int((t_start - job.timetable['last']) * 1000)) +
432                       " ms (update_every: " + str(job.timetable['freq'] * 1000) +
433                       " ms, latency: " + str(int((t_start - job.timetable['next']) * 1000)) + " ms)")
434             if not job.update(since_last):
435                 if job.retries_left <= 0:
436                     self._stop(job, "update failed")
437                     return False
438                 job.retries_left -= 1
439                 job.timetable['next'] += job.timetable['freq']
440                 return True
441         except AttributeError:
442             self._stop(job, "no update")
443             return False
444         except (UnboundLocalError, Exception) as e:
445             self._stop(job, "misbehaving. Reason: " + str(e))
446             return False
447         t_end = time.time()
448         job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq']
449         # draw performance graph
450         run_time = str(int((t_end - t_start) * 1000))
451         debug(job.chart_name, "updated in", run_time, "ms")
452         sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.chart_name + " " + str(since_last) + '\n')
453         sys.stdout.write("SET run_time = " + run_time + '\n')
454         sys.stdout.write("END\n")
455         # sys.stdout.flush()
456         job.timetable['last'] = t_start
457         job.retries_left = job.retries
458         self.first_run = False
459         return True
460
461     def update(self):
462         """
463         Tries to execute update() on every job by using _update_job()
464         This will stay forever and ever and ever forever and ever it'll be the one...
465         """
466         self.first_run = True
467         while True:
468             next_runs = []
469             i = 0
470             while i < len(self.jobs):
471                 job = self.jobs[i]
472                 if self._update_job(job):
473                     try:
474                         next_runs.append(job.timetable['next'])
475                     except KeyError:
476                         pass
477                     i += 1
478             if len(next_runs) == 0:
479                 fatal('no python.d modules loaded.')
480             try:
481                 time.sleep(min(next_runs) - time.time())
482             except IOError:
483                 pass
484
485
486 def read_config(path):
487     """
488     Read YAML configuration from specified file
489     :param path: str
490     :return: dict
491     """
492     try:
493         with open(path, 'r') as stream:
494             config = yaml.load(stream)
495     except (OSError, IOError):
496         error(str(path), "is not a valid configuration file")
497         return None
498     except yaml.YAMLError as e:
499         error(str(path), "is malformed:", e)
500         return None
501     return config
502
503
504 def parse_cmdline(directory, *commands):
505     """
506     Parse parameters from command line.
507     :param directory: str
508     :param commands: list of str
509     :return: dict
510     """
511     global DEBUG_FLAG
512     global OVERRIDE_UPDATE_EVERY
513     global BASE_CONFIG
514
515     changed_update = False
516     mods = []
517     for cmd in commands[1:]:
518         if cmd == "check":
519             pass
520         elif cmd == "debug" or cmd == "all":
521             DEBUG_FLAG = True
522             # redirect stderr to stdout?
523         elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
524             DEBUG_FLAG = True
525             mods.append(cmd.replace(".chart.py", ""))
526         else:
527             try:
528                 BASE_CONFIG['update_every'] = int(cmd)
529                 changed_update = True
530             except ValueError:
531                 pass
532     if changed_update and DEBUG_FLAG:
533         OVERRIDE_UPDATE_EVERY = True
534         debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
535
536     debug("started from", commands[0], "with options:", *commands[1:])
537
538     return mods
539
540
541 # if __name__ == '__main__':
542 def run():
543     """
544     Main program.
545     """
546     global DEBUG_FLAG, BASE_CONFIG
547
548     # read configuration file
549     disabled = []
550     configfile = CONFIG_DIR + "python.d.conf"
551     debug(PROGRAM, "reading configuration file:", configfile)
552
553     conf = read_config(configfile)
554     if conf is not None:
555         try:
556             # exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
557             if conf['enabled'] is False:
558                 fatal('disabled in configuration file.\n')
559         except (KeyError, TypeError):
560             pass
561         try:
562             for param in BASE_CONFIG:
563                 BASE_CONFIG[param] = conf[param]
564         except (KeyError, TypeError):
565             pass  # use default update_every from NETDATA_UPDATE_EVERY
566         try:
567             DEBUG_FLAG = conf['debug']
568         except (KeyError, TypeError):
569             pass
570         for k, v in conf.items():
571             if k in ("update_every", "debug", "enabled"):
572                 continue
573             if v is False:
574                 disabled.append(k)
575
576     # parse passed command line arguments
577     modules = parse_cmdline(MODULES_DIR, *sys.argv)
578     info("MODULES_DIR='" + MODULES_DIR +
579          "', CONFIG_DIR='" + CONFIG_DIR +
580          "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
581          ", ONLY_MODULES=" + str(modules))
582
583     # run plugins
584     charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
585     charts.check()
586     charts.create()
587     charts.update()
588     fatal("finished")
589
590
591 if __name__ == '__main__':
592     run()