]> arthur.barton.de Git - netdata.git/blob - python.d/python_modules/base.py
Merge pull request #1093 from romain-dartigues/rdartigues
[netdata.git] / python.d / python_modules / base.py
1 # -*- coding: utf-8 -*-
2 # Description: netdata python modules framework
3 # Author: Pawel Krupa (paulfantom)
4
5 # Remember:
6 # ALL CODE NEEDS TO BE COMPATIBLE WITH Python > 2.7 and Python > 3.1
7 # Follow PEP8 as much as it is possible
8 # "check" and "create" CANNOT be blocking.
9 # "update" CAN be blocking
10 # "update" function needs to be fast, so follow:
11 #   https://wiki.python.org/moin/PythonSpeed/PerformanceTips
12 # basically:
13 #  - use local variables wherever it is possible
14 #  - avoid dots in expressions that are executed many times
15 #  - use "join()" instead of "+"
16 #  - use "import" only at the beginning
17 #
18 # using ".encode()" in one thread can block other threads as well (only in python2)
19
20 import time
21 # import sys
22 import os
23 import socket
24 import select
25 try:
26     import urllib.request as urllib2
27 except ImportError:
28     import urllib2
29
30 from subprocess import Popen, PIPE
31
32 import threading
33 import msg
34
35
36 # class BaseService(threading.Thread):
37 class SimpleService(threading.Thread):
38     """
39     Prototype of Service class.
40     Implemented basic functionality to run jobs by `python.d.plugin`
41     """
42     def __init__(self, configuration=None, name=None):
43         """
44         This needs to be initialized in child classes
45         :param configuration: dict
46         :param name: str
47         """
48         threading.Thread.__init__(self)
49         self._data_stream = ""
50         self.daemon = True
51         self.retries = 0
52         self.retries_left = 0
53         self.priority = 140000
54         self.update_every = 1
55         self.name = name
56         self.override_name = None
57         self.chart_name = ""
58         self._dimensions = []
59         self._charts = []
60         self.__chart_set = False
61         self.__first_run = True
62         self.order = []
63         self.definitions = {}
64         if configuration is None:
65             self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
66             raise RuntimeError
67         else:
68             self._extract_base_config(configuration)
69             self.timetable = {}
70             self.create_timetable()
71
72     # --- BASIC SERVICE CONFIGURATION ---
73
74     def _extract_base_config(self, config):
75         """
76         Get basic parameters to run service
77         Minimum config:
78             config = {'update_every':1,
79                       'priority':100000,
80                       'retries':0}
81         :param config: dict
82         """
83         pop = config.pop
84         try:
85             self.override_name = pop('name')
86         except KeyError:
87             pass
88         self.update_every = int(pop('update_every'))
89         self.priority = int(pop('priority'))
90         self.retries = int(pop('retries'))
91         self.retries_left = self.retries
92         self.configuration = config
93
94     def create_timetable(self, freq=None):
95         """
96         Create service timetable.
97         `freq` is optional
98         Example:
99             timetable = {'last': 1466370091.3767564,
100                          'next': 1466370092,
101                          'freq': 1}
102         :param freq: int
103         """
104         if freq is None:
105             freq = self.update_every
106         now = time.time()
107         self.timetable = {'last': now,
108                           'next': now - (now % freq) + freq,
109                           'freq': freq}
110
111     # --- THREAD CONFIGURATION ---
112
113     def _run_once(self):
114         """
115         Executes self.update(interval) and draws run time chart.
116         Return value presents exit status of update()
117         :return: boolean
118         """
119         t_start = time.time()
120         timetable = self.timetable
121         chart_name = self.chart_name
122         # check if it is time to execute job update() function
123         if timetable['next'] > t_start:
124             self.debug(chart_name, "will be run in", str(int((timetable['next'] - t_start) * 1000)), "ms")
125             return True
126
127         since_last = int((t_start - timetable['last']) * 1000000)
128         self.debug(chart_name,
129                    "ready to run, after", str(int((t_start - timetable['last']) * 1000)),
130                    "ms (update_every:", str(timetable['freq'] * 1000),
131                    "ms, latency:", str(int((t_start - timetable['next']) * 1000)), "ms")
132         if self.__first_run:
133             since_last = 0
134         if not self.update(since_last):
135             self.error("update function failed.")
136             return False
137         t_end = time.time()
138         self.timetable['next'] = t_end - (t_end % timetable['freq']) + timetable['freq']
139         # draw performance graph
140         run_time = str(int((t_end - t_start) * 1000))
141         # noinspection SqlNoDataSourceInspection
142         print("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" %
143               (self.chart_name, str(since_last), run_time))
144         # sys.stdout.write("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" %
145         #                  (self.chart_name, str(since_last), run_time))
146
147         self.debug(chart_name, "updated in", str(run_time), "ms")
148         self.timetable['last'] = t_start
149         self.__first_run = False
150         return True
151
152     def run(self):
153         """
154         Runs job in thread. Handles retries.
155         Exits when job failed or timed out.
156         :return: None
157         """
158         self.timetable['last'] = time.time()
159         while True:  # run forever, unless something is wrong
160             try:
161                 status = self._run_once()
162             except Exception as e:
163                 self.error("Something wrong: ", str(e))
164                 return
165             if status:  # handle retries if update failed
166                 time.sleep(self.timetable['next'] - time.time())
167                 self.retries_left = self.retries
168             else:
169                 self.retries_left -= 1
170                 if self.retries_left <= 0:
171                     self.error("no more retries. Exiting")
172                     return
173                 else:
174                     time.sleep(self.timetable['freq'])
175
176     # --- CHART ---
177
178     @staticmethod
179     def _format(*args):
180         """
181         Escape and convert passed arguments.
182         :param args: anything
183         :return: list
184         """
185         params = []
186         append = params.append
187         for p in args:
188             if p is None:
189                 append(p)
190                 continue
191             if type(p) is not str:
192                 p = str(p)
193             if ' ' in p:
194                 p = "'" + p + "'"
195             append(p)
196         return params
197
198     def _line(self, instruction, *params):
199         """
200         Converts *params to string and joins them with one space between every one.
201         Result is appended to self._data_stream
202         :param params: str/int/float
203         """
204         tmp = list(map((lambda x: "''" if x is None or len(x) == 0 else x), params))
205         self._data_stream += "%s %s\n" % (instruction, str(" ".join(tmp)))
206
207     def chart(self, type_id, name="", title="", units="", family="",
208               category="", chart_type="line", priority="", update_every=""):
209         """
210         Defines a new chart.
211         :param type_id: str
212         :param name: str
213         :param title: str
214         :param units: str
215         :param family: str
216         :param category: str
217         :param chart_type: str
218         :param priority: int/str
219         :param update_every: int/str
220         """
221         self._charts.append(type_id)
222
223         p = self._format(type_id, name, title, units, family, category, chart_type, priority, update_every)
224         self._line("CHART", *p)
225
226     def dimension(self, id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False):
227         """
228         Defines a new dimension for the chart
229         :param id: str
230         :param name: str
231         :param algorithm: str
232         :param multiplier: int/str
233         :param divisor: int/str
234         :param hidden: boolean
235         :return:
236         """
237         try:
238             int(multiplier)
239         except TypeError:
240             self.error("malformed dimension: multiplier is not a number:", multiplier)
241             multiplier = 1
242         try:
243             int(divisor)
244         except TypeError:
245             self.error("malformed dimension: divisor is not a number:", divisor)
246             divisor = 1
247         if name is None:
248             name = id
249         if algorithm not in ("absolute", "incremental", "percentage-of-absolute-row", "percentage-of-incremental-row"):
250             algorithm = "absolute"
251
252         self._dimensions.append(str(id))
253         if hidden:
254             p = self._format(id, name, algorithm, multiplier, divisor, "hidden")
255         else:
256             p = self._format(id, name, algorithm, multiplier, divisor)
257
258         self._line("DIMENSION", *p)
259
260     def begin(self, type_id, microseconds=0):
261         """
262         Begin data set
263         :param type_id: str
264         :param microseconds: int
265         :return: boolean
266         """
267         if type_id not in self._charts:
268             self.error("wrong chart type_id:", type_id)
269             return False
270         try:
271             int(microseconds)
272         except TypeError:
273             self.error("malformed begin statement: microseconds are not a number:", microseconds)
274             microseconds = ""
275
276         self._line("BEGIN", type_id, str(microseconds))
277         return True
278
279     def set(self, id, value):
280         """
281         Set value to dimension
282         :param id: str
283         :param value: int/float
284         :return: boolean
285         """
286         if id not in self._dimensions:
287             self.error("wrong dimension id:", id, "Available dimensions are:", *self._dimensions)
288             return False
289         try:
290             value = str(int(value))
291         except TypeError:
292             self.error("cannot set non-numeric value:", str(value))
293             return False
294         self._line("SET", id, "=", str(value))
295         self.__chart_set = True
296         return True
297
298     def end(self):
299         if self.__chart_set:
300             self._line("END")
301             self.__chart_set = False
302         else:
303             pos = self._data_stream.rfind("BEGIN")
304             self._data_stream = self._data_stream[:pos]
305
306     def commit(self):
307         """
308         Upload new data to netdata.
309         """
310         print(self._data_stream)
311         self._data_stream = ""
312
313     # --- ERROR HANDLING ---
314
315     def error(self, *params):
316         """
317         Show error message on stderr
318         """
319         msg.error(self.chart_name, *params)
320
321     def debug(self, *params):
322         """
323         Show debug message on stderr
324         """
325         msg.debug(self.chart_name, *params)
326
327     def info(self, *params):
328         """
329         Show information message on stderr
330         """
331         msg.info(self.chart_name, *params)
332
333     # --- MAIN METHODS ---
334
335     def _get_data(self):
336         """
337         Get some data
338         :return: dict
339         """
340         return {}
341
342     def check(self):
343         """
344         check() prototype
345         :return: boolean
346         """
347         self.debug("Module", str(self.__module__), "doesn't implement check() function. Using default.")
348         if self._get_data() is None or len(self._get_data()) == 0:
349             return False
350         else:
351             return True
352
353     def create(self):
354         """
355         Create charts
356         :return: boolean
357         """
358         data = self._get_data()
359         if data is None:
360             return False
361
362         idx = 0
363         for name in self.order:
364             options = self.definitions[name]['options'] + [self.priority + idx, self.update_every]
365             self.chart(self.chart_name + "." + name, *options)
366             # check if server has this datapoint
367             for line in self.definitions[name]['lines']:
368                 if line[0] in data:
369                     self.dimension(*line)
370             idx += 1
371
372         self.commit()
373         return True
374
375     def update(self, interval):
376         """
377         Update charts
378         :param interval: int
379         :return: boolean
380         """
381         data = self._get_data()
382         if data is None:
383             self.debug("_get_data() returned no data")
384             return False
385
386         updated = False
387         for chart in self.order:
388             if self.begin(self.chart_name + "." + chart, interval):
389                 updated = True
390                 for dim in self.definitions[chart]['lines']:
391                     try:
392                         self.set(dim[0], data[dim[0]])
393                     except KeyError:
394                         pass
395                 self.end()
396
397         self.commit()
398         if not updated:
399             self.error("no charts to update")
400
401         return updated
402
403
404 class UrlService(SimpleService):
405     # TODO add support for https connections
406     def __init__(self, configuration=None, name=None):
407         self.url = ""
408         self.user = None
409         self.password = None
410         self.proxies = {}
411         SimpleService.__init__(self, configuration=configuration, name=name)
412
413     def __add_openers(self):
414         # TODO add error handling
415         self.opener = urllib2.build_opener()
416
417         # Proxy handling
418         # TODO currently self.proxies isn't parsed from configuration file
419         # if len(self.proxies) > 0:
420         #     for proxy in self.proxies:
421         #         url = proxy['url']
422         #         # TODO test this:
423         #         if "user" in proxy and "pass" in proxy:
424         #             if url.lower().startswith('https://'):
425         #                 url = 'https://' + proxy['user'] + ':' + proxy['pass'] + '@' + url[8:]
426         #             else:
427         #                 url = 'http://' + proxy['user'] + ':' + proxy['pass'] + '@' + url[7:]
428         #         # FIXME move proxy auth to sth like this:
429         #         #     passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
430         #         #     passman.add_password(None, url, proxy['user'], proxy['password'])
431         #         #     opener.add_handler(urllib2.HTTPBasicAuthHandler(passman))
432         #
433         #         if url.lower().startswith('https://'):
434         #             opener.add_handler(urllib2.ProxyHandler({'https': url}))
435         #         else:
436         #             opener.add_handler(urllib2.ProxyHandler({'https': url}))
437
438         # HTTP Basic Auth
439         if self.user is not None and self.password is not None:
440             passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
441             passman.add_password(None, self.url, self.user, self.password)
442             self.opener.add_handler(urllib2.HTTPBasicAuthHandler(passman))
443             self.debug("Enabling HTTP basic auth")
444
445         #urllib2.install_opener(opener)
446
447     def _get_raw_data(self):
448         """
449         Get raw data from http request
450         :return: str
451         """
452         raw = None
453         try:
454             f = self.opener.open(self.url, timeout=self.update_every * 2)
455             # f = urllib2.urlopen(self.url, timeout=self.update_every * 2)
456         except Exception as e:
457             self.error(str(e))
458             return None
459
460         try:
461             raw = f.read().decode('utf-8')
462         except Exception as e:
463             self.error(str(e))
464         finally:
465             f.close()
466         return raw
467
468     def check(self):
469         """
470         Format configuration data and try to connect to server
471         :return: boolean
472         """
473         if self.name is None or self.name == str(None):
474             self.name = 'local'
475             self.chart_name += "_" + self.name
476         else:
477             self.name = str(self.name)
478         try:
479             self.url = str(self.configuration['url'])
480         except (KeyError, TypeError):
481             pass
482         try:
483             self.user = str(self.configuration['user'])
484         except (KeyError, TypeError):
485             pass
486         try:
487             self.password = str(self.configuration['pass'])
488         except (KeyError, TypeError):
489             pass
490
491         self.__add_openers()
492
493         test = self._get_data()
494         if test is None or len(test) == 0:
495             return False
496         else:
497             return True
498
499
500 class SocketService(SimpleService):
501     def __init__(self, configuration=None, name=None):
502         self._sock = None
503         self._keep_alive = False
504         self.host = "localhost"
505         self.port = None
506         self.unix_socket = None
507         self.request = ""
508         self.__socket_config = None
509         SimpleService.__init__(self, configuration=configuration, name=name)
510
511     def _connect(self):
512         """
513         Recreate socket and connect to it since sockets cannot be reused after closing
514         Available configurations are IPv6, IPv4 or UNIX socket
515         :return:
516         """
517         try:
518             if self.unix_socket is None:
519                 if self.__socket_config is None:
520                     # establish ipv6 or ipv4 connection.
521                     for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
522                         try:
523                             # noinspection SpellCheckingInspection
524                             af, socktype, proto, canonname, sa = res
525                             self._sock = socket.socket(af, socktype, proto)
526                         except socket.error as e:
527                             self.debug("Cannot create socket:", str(e))
528                             self._sock = None
529                             continue
530                         try:
531                             self._sock.connect(sa)
532                         except socket.error as e:
533                             self.debug("Cannot connect to socket:", str(e))
534                             self._disconnect()
535                             continue
536                         self.__socket_config = res
537                         break
538                 else:
539                     # connect to socket with previously established configuration
540                     try:
541                         af, socktype, proto, canonname, sa = self.__socket_config
542                         self._sock = socket.socket(af, socktype, proto)
543                         self._sock.connect(sa)
544                     except socket.error as e:
545                         self.debug("Cannot create or connect to socket:", str(e))
546                         self._disconnect()
547             else:
548                 # connect to unix socket
549                 try:
550                     self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
551                     self._sock.connect(self.unix_socket)
552                 except socket.error:
553                     self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
554                     self._sock.connect(self.unix_socket)
555
556         except Exception as e:
557             self.error(str(e),
558                        "Cannot create socket with following configuration: host:", str(self.host),
559                        "port:", str(self.port),
560                        "socket:", str(self.unix_socket))
561             self._sock = None
562         self._sock.setblocking(0)
563
564     def _disconnect(self):
565         """
566         Close socket connection
567         :return:
568         """
569         try:
570             self._sock.shutdown(2)  # 0 - read, 1 - write, 2 - all
571             self._sock.close()
572         except Exception:
573             pass
574         self._sock = None
575
576     def _send(self):
577         """
578         Send request.
579         :return: boolean
580         """
581         # Send request if it is needed
582         if self.request != "".encode():
583             try:
584                 self._sock.send(self.request)
585             except Exception as e:
586                 self._disconnect()
587                 self.error(str(e),
588                            "used configuration: host:", str(self.host),
589                            "port:", str(self.port),
590                            "socket:", str(self.unix_socket))
591                 return False
592         return True
593
594     def _receive(self):
595         """
596         Receive data from socket
597         :return: str
598         """
599         data = ""
600         while True:
601             try:
602                 ready_to_read, _, in_error = select.select([self._sock], [], [], 5)
603             except Exception as e:
604                 self.debug("SELECT", str(e))
605                 self._disconnect()
606                 break
607             if len(ready_to_read) > 0:
608                 buf = self._sock.recv(4096)
609                 if len(buf) == 0 or buf is None:  # handle server disconnect
610                     break
611                 data += buf.decode(errors='ignore')
612                 if self._check_raw_data(data):
613                     break
614             else:
615                 self.error("Socket timed out.")
616                 self._disconnect()
617                 break
618
619         return data
620
621     def _get_raw_data(self):
622         """
623         Get raw data with low-level "socket" module.
624         :return: str
625         """
626         if self._sock is None:
627             self._connect()
628
629         # Send request if it is needed
630         if not self._send():
631             return None
632
633         data = self._receive()
634
635         if not self._keep_alive:
636             self._disconnect()
637
638         return data
639
640     def _check_raw_data(self, data):
641         """
642         Check if all data has been gathered from socket
643         :param data: str
644         :return: boolean
645         """
646         return True
647
648     def _parse_config(self):
649         """
650         Parse configuration data
651         :return: boolean
652         """
653         if self.name is None or self.name == str(None):
654             self.name = ""
655         else:
656             self.name = str(self.name)
657         try:
658             self.unix_socket = str(self.configuration['socket'])
659         except (KeyError, TypeError):
660             self.debug("No unix socket specified. Trying TCP/IP socket.")
661             self.unix_socket = None
662             try:
663                 self.host = str(self.configuration['host'])
664             except (KeyError, TypeError):
665                 self.debug("No host specified. Using: '" + self.host + "'")
666             try:
667                 self.port = int(self.configuration['port'])
668             except (KeyError, TypeError):
669                 self.debug("No port specified. Using: '" + str(self.port) + "'")
670         try:
671             self.request = str(self.configuration['request'])
672         except (KeyError, TypeError):
673             self.debug("No request specified. Using: '" + str(self.request) + "'")
674         self.request = self.request.encode()
675
676     def check(self):
677         self._parse_config()
678         return SimpleService.check(self)
679
680
681 class LogService(SimpleService):
682     def __init__(self, configuration=None, name=None):
683         self.log_path = ""
684         self._last_position = 0
685         # self._log_reader = None
686         SimpleService.__init__(self, configuration=configuration, name=name)
687         self.retries = 100000  # basically always retry
688
689     def _get_raw_data(self):
690         """
691         Get log lines since last poll
692         :return: list
693         """
694         lines = []
695         try:
696             if os.path.getsize(self.log_path) < self._last_position:
697                 self._last_position = 0  # read from beginning if file has shrunk
698             elif os.path.getsize(self.log_path) == self._last_position:
699                 self.debug("Log file hasn't changed. No new data.")
700                 return []  # return empty list if nothing has changed
701             with open(self.log_path, "r") as fp:
702                 fp.seek(self._last_position)
703                 for i, line in enumerate(fp):
704                     lines.append(line)
705                 self._last_position = fp.tell()
706         except Exception as e:
707             self.error(str(e))
708
709         if len(lines) != 0:
710             return lines
711         else:
712             self.error("No data collected.")
713             return None
714
715     def check(self):
716         """
717         Parse basic configuration and check if log file exists
718         :return: boolean
719         """
720         if self.name is not None or self.name != str(None):
721             self.name = ""
722         else:
723             self.name = str(self.name)
724         try:
725             self.log_path = str(self.configuration['path'])
726         except (KeyError, TypeError):
727             self.error("No path to log specified. Using: '" + self.log_path + "'")
728
729         if os.access(self.log_path, os.R_OK):
730             return True
731         else:
732             self.error("Cannot access file: '" + self.log_path + "'")
733             return False
734
735     def create(self):
736         # set cursor at last byte of log file
737         self._last_position = os.path.getsize(self.log_path)
738         status = SimpleService.create(self)
739         # self._last_position = 0
740         return status
741
742
743 class ExecutableService(SimpleService):
744     bad_substrings = ('&', '|', ';', '>', '<')
745
746     def __init__(self, configuration=None, name=None):
747         self.command = ""
748         SimpleService.__init__(self, configuration=configuration, name=name)
749
750     def _get_raw_data(self):
751         """
752         Get raw data from executed command
753         :return: str
754         """
755         try:
756             p = Popen(self.command, stdout=PIPE, stderr=PIPE)
757         except Exception as e:
758             self.error("Executing command", self.command, "resulted in error:", str(e))
759             return None
760         data = []
761         for line in p.stdout.readlines():
762             data.append(str(line.decode()))
763
764         if len(data) == 0:
765             self.error("No data collected.")
766             return None
767
768         return data
769
770     def check(self):
771         """
772         Parse basic configuration, check if command is whitelisted and is returning values
773         :return: boolean
774         """
775         if self.name is not None or self.name != str(None):
776             self.name = ""
777         else:
778             self.name = str(self.name)
779         try:
780             self.command = str(self.configuration['command'])
781         except (KeyError, TypeError):
782             self.error("No command specified. Using: '" + self.command + "'")
783         command = self.command.split(' ')
784
785         for arg in command[1:]:
786             if any(st in arg for st in self.bad_substrings):
787                 self.error("Bad command argument:" + " ".join(self.command[1:]))
788                 return False
789         # test command and search for it in /usr/sbin or /sbin when failed
790         base = command[0].split('/')[-1]
791         if self._get_raw_data() is None:
792             for prefix in ['/sbin/', '/usr/sbin/']:
793                 command[0] = prefix + base
794                 if os.path.isfile(command[0]):
795                     break
796         self.command = command
797         if self._get_data() is None or len(self._get_data()) == 0:
798             self.error("Command", self.command, "returned no data")
799             return False
800         return True