]> arthur.barton.de Git - netdata.git/blob - python.d/python_modules/base.py
UNIX socket fallback to SOCK_STREAM when SOCK_DGRAM is not available
[netdata.git] / python.d / python_modules / base.py
1 # -*- coding: utf-8 -*-
2 # Description: netdata python modules framework
3 # Author: Pawel Krupa (paulfantom)
4
5 # Remember:
6 # ALL CODE NEEDS TO BE COMPATIBLE WITH Python > 2.7 and Python > 3.1
7 # Follow PEP8 as much as it is possible
8 # "check" and "create" CANNOT be blocking.
9 # "update" CAN be blocking
10 # "update" function needs to be fast, so follow:
11 #   https://wiki.python.org/moin/PythonSpeed/PerformanceTips
12 # basically:
13 #  - use local variables wherever it is possible
14 #  - avoid dots in expressions that are executed many times
15 #  - use "join()" instead of "+"
16 #  - use "import" only at the beginning
17 #
18 # using ".encode()" in one thread can block other threads as well (only in python2)
19
20 import time
21 # import sys
22 import os
23 import socket
24 import select
25 try:
26     import urllib.request as urllib2
27 except ImportError:
28     import urllib2
29
30 from subprocess import Popen, PIPE
31
32 import threading
33 import msg
34
35
36 # class BaseService(threading.Thread):
37 class SimpleService(threading.Thread):
38     """
39     Prototype of Service class.
40     Implemented basic functionality to run jobs by `python.d.plugin`
41     """
42     def __init__(self, configuration=None, name=None):
43         """
44         This needs to be initialized in child classes
45         :param configuration: dict
46         :param name: str
47         """
48         threading.Thread.__init__(self)
49         self._data_stream = ""
50         self.daemon = True
51         self.retries = 0
52         self.retries_left = 0
53         self.priority = 140000
54         self.update_every = 1
55         self.name = name
56         self.override_name = None
57         self.chart_name = ""
58         self._dimensions = []
59         self._charts = []
60         self.__chart_set = False
61         self.__first_run = True
62         self.order = []
63         self.definitions = {}
64         if configuration is None:
65             self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
66             raise RuntimeError
67         else:
68             self._extract_base_config(configuration)
69             self.timetable = {}
70             self.create_timetable()
71
72     # --- BASIC SERVICE CONFIGURATION ---
73
74     def _extract_base_config(self, config):
75         """
76         Get basic parameters to run service
77         Minimum config:
78             config = {'update_every':1,
79                       'priority':100000,
80                       'retries':0}
81         :param config: dict
82         """
83         pop = config.pop
84         try:
85             self.override_name = pop('name')
86         except KeyError:
87             pass
88         self.update_every = int(pop('update_every'))
89         self.priority = int(pop('priority'))
90         self.retries = int(pop('retries'))
91         self.retries_left = self.retries
92         self.configuration = config
93
94     def create_timetable(self, freq=None):
95         """
96         Create service timetable.
97         `freq` is optional
98         Example:
99             timetable = {'last': 1466370091.3767564,
100                          'next': 1466370092,
101                          'freq': 1}
102         :param freq: int
103         """
104         if freq is None:
105             freq = self.update_every
106         now = time.time()
107         self.timetable = {'last': now,
108                           'next': now - (now % freq) + freq,
109                           'freq': freq}
110
111     # --- THREAD CONFIGURATION ---
112
113     def _run_once(self):
114         """
115         Executes self.update(interval) and draws run time chart.
116         Return value presents exit status of update()
117         :return: boolean
118         """
119         t_start = time.time()
120         timetable = self.timetable
121         chart_name = self.chart_name
122         # check if it is time to execute job update() function
123         if timetable['next'] > t_start:
124             self.debug(chart_name, "will be run in", str(int((timetable['next'] - t_start) * 1000)), "ms")
125             return True
126
127         since_last = int((t_start - timetable['last']) * 1000000)
128         self.debug(chart_name,
129                    "ready to run, after", str(int((t_start - timetable['last']) * 1000)),
130                    "ms (update_every:", str(timetable['freq'] * 1000),
131                    "ms, latency:", str(int((t_start - timetable['next']) * 1000)), "ms")
132         if self.__first_run:
133             since_last = 0
134         if not self.update(since_last):
135             self.error("update function failed.")
136             return False
137         t_end = time.time()
138         self.timetable['next'] = t_end - (t_end % timetable['freq']) + timetable['freq']
139         # draw performance graph
140         run_time = str(int((t_end - t_start) * 1000))
141         # noinspection SqlNoDataSourceInspection
142         print("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" %
143               (self.chart_name, str(since_last), run_time))
144         # sys.stdout.write("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" %
145         #                  (self.chart_name, str(since_last), run_time))
146
147         self.debug(chart_name, "updated in", str(run_time), "ms")
148         self.timetable['last'] = t_start
149         self.__first_run = False
150         return True
151
152     def run(self):
153         """
154         Runs job in thread. Handles retries.
155         Exits when job failed or timed out.
156         :return: None
157         """
158         self.timetable['last'] = time.time()
159         while True:  # run forever, unless something is wrong
160             try:
161                 status = self._run_once()
162             except Exception as e:
163                 self.error("Something wrong: ", str(e))
164                 return
165             if status:  # handle retries if update failed
166                 time.sleep(self.timetable['next'] - time.time())
167                 self.retries_left = self.retries
168             else:
169                 self.retries_left -= 1
170                 if self.retries_left <= 0:
171                     self.error("no more retries. Exiting")
172                     return
173                 else:
174                     time.sleep(self.timetable['freq'])
175
176     # --- CHART ---
177
178     @staticmethod
179     def _format(*args):
180         """
181         Escape and convert passed arguments.
182         :param args: anything
183         :return: list
184         """
185         params = []
186         append = params.append
187         for p in args:
188             if p is None:
189                 append(p)
190                 continue
191             if type(p) is not str:
192                 p = str(p)
193             if ' ' in p:
194                 p = "'" + p + "'"
195             append(p)
196         return params
197
198     def _line(self, instruction, *params):
199         """
200         Converts *params to string and joins them with one space between every one.
201         Result is appended to self._data_stream
202         :param params: str/int/float
203         """
204         tmp = list(map((lambda x: "''" if x is None or len(x) == 0 else x), params))
205         self._data_stream += "%s %s\n" % (instruction, str(" ".join(tmp)))
206
207     def chart(self, type_id, name="", title="", units="", family="",
208               category="", chart_type="line", priority="", update_every=""):
209         """
210         Defines a new chart.
211         :param type_id: str
212         :param name: str
213         :param title: str
214         :param units: str
215         :param family: str
216         :param category: str
217         :param chart_type: str
218         :param priority: int/str
219         :param update_every: int/str
220         """
221         self._charts.append(type_id)
222
223         p = self._format(type_id, name, title, units, family, category, chart_type, priority, update_every)
224         self._line("CHART", *p)
225
226     def dimension(self, id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False):
227         """
228         Defines a new dimension for the chart
229         :param id: str
230         :param name: str
231         :param algorithm: str
232         :param multiplier: int/str
233         :param divisor: int/str
234         :param hidden: boolean
235         :return:
236         """
237         try:
238             int(multiplier)
239         except TypeError:
240             self.error("malformed dimension: multiplier is not a number:", multiplier)
241             multiplier = 1
242         try:
243             int(divisor)
244         except TypeError:
245             self.error("malformed dimension: divisor is not a number:", divisor)
246             divisor = 1
247         if name is None:
248             name = id
249         if algorithm not in ("absolute", "incremental", "percentage-of-absolute-row", "percentage-of-incremental-row"):
250             algorithm = "absolute"
251
252         self._dimensions.append(str(id))
253         if hidden:
254             p = self._format(id, name, algorithm, multiplier, divisor, "hidden")
255         else:
256             p = self._format(id, name, algorithm, multiplier, divisor)
257
258         self._line("DIMENSION", *p)
259
260     def begin(self, type_id, microseconds=0):
261         """
262         Begin data set
263         :param type_id: str
264         :param microseconds: int
265         :return: boolean
266         """
267         if type_id not in self._charts:
268             self.error("wrong chart type_id:", type_id)
269             return False
270         try:
271             int(microseconds)
272         except TypeError:
273             self.error("malformed begin statement: microseconds are not a number:", microseconds)
274             microseconds = ""
275
276         self._line("BEGIN", type_id, str(microseconds))
277         return True
278
279     def set(self, id, value):
280         """
281         Set value to dimension
282         :param id: str
283         :param value: int/float
284         :return: boolean
285         """
286         if id not in self._dimensions:
287             self.error("wrong dimension id:", id, "Available dimensions are:", *self._dimensions)
288             return False
289         try:
290             value = str(int(value))
291         except TypeError:
292             self.error("cannot set non-numeric value:", str(value))
293             return False
294         self._line("SET", id, "=", str(value))
295         self.__chart_set = True
296         return True
297
298     def end(self):
299         if self.__chart_set:
300             self._line("END")
301             self.__chart_set = False
302         else:
303             pos = self._data_stream.rfind("BEGIN")
304             self._data_stream = self._data_stream[:pos]
305
306     def commit(self):
307         """
308         Upload new data to netdata.
309         """
310         print(self._data_stream)
311         self._data_stream = ""
312
313     # --- ERROR HANDLING ---
314
315     def error(self, *params):
316         """
317         Show error message on stderr
318         """
319         msg.error(self.chart_name, *params)
320
321     def debug(self, *params):
322         """
323         Show debug message on stderr
324         """
325         msg.debug(self.chart_name, *params)
326
327     def info(self, *params):
328         """
329         Show information message on stderr
330         """
331         msg.info(self.chart_name, *params)
332
333     # --- MAIN METHODS ---
334
335     def _get_data(self):
336         """
337         Get some data
338         :return: dict
339         """
340         return {}
341
342     def check(self):
343         """
344         check() prototype
345         :return: boolean
346         """
347         self.debug("Module", str(self.__module__), "doesn't implement check() function. Using default.")
348         if self._get_data() is None or len(self._get_data()) == 0:
349             return False
350         else:
351             return True
352
353     def create(self):
354         """
355         Create charts
356         :return: boolean
357         """
358         data = self._get_data()
359         if data is None:
360             return False
361
362         idx = 0
363         for name in self.order:
364             options = self.definitions[name]['options'] + [self.priority + idx, self.update_every]
365             self.chart(self.chart_name + "." + name, *options)
366             # check if server has this datapoint
367             for line in self.definitions[name]['lines']:
368                 if line[0] in data:
369                     self.dimension(*line)
370             idx += 1
371
372         self.commit()
373         return True
374
375     def update(self, interval):
376         """
377         Update charts
378         :param interval: int
379         :return: boolean
380         """
381         data = self._get_data()
382         if data is None:
383             self.debug("_get_data() returned no data")
384             return False
385
386         updated = False
387         for chart in self.order:
388             if self.begin(self.chart_name + "." + chart, interval):
389                 updated = True
390                 for dim in self.definitions[chart]['lines']:
391                     try:
392                         self.set(dim[0], data[dim[0]])
393                     except KeyError:
394                         pass
395                 self.end()
396
397         self.commit()
398         if not updated:
399             self.error("no charts to update")
400
401         return updated
402
403
404 class UrlService(SimpleService):
405     # TODO add support for https connections
406     def __init__(self, configuration=None, name=None):
407         self.url = ""
408         self.user = None
409         self.password = None
410         self.proxies = {}
411         SimpleService.__init__(self, configuration=configuration, name=name)
412
413     def __add_openers(self):
414         # TODO add error handling
415         opener = urllib2.build_opener()
416
417         # Proxy handling
418         # TODO currently self.proxies isn't parsed from configuration file
419         # if len(self.proxies) > 0:
420         #     for proxy in self.proxies:
421         #         url = proxy['url']
422         #         # TODO test this:
423         #         if "user" in proxy and "pass" in proxy:
424         #             if url.lower().startswith('https://'):
425         #                 url = 'https://' + proxy['user'] + ':' + proxy['pass'] + '@' + url[8:]
426         #             else:
427         #                 url = 'http://' + proxy['user'] + ':' + proxy['pass'] + '@' + url[7:]
428         #         # FIXME move proxy auth to sth like this:
429         #         #     passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
430         #         #     passman.add_password(None, url, proxy['user'], proxy['password'])
431         #         #     opener.add_handler(urllib2.HTTPBasicAuthHandler(passman))
432         #
433         #         if url.lower().startswith('https://'):
434         #             opener.add_handler(urllib2.ProxyHandler({'https': url}))
435         #         else:
436         #             opener.add_handler(urllib2.ProxyHandler({'https': url}))
437
438         # HTTP Basic Auth
439         if self.user is not None and self.password is not None:
440             passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
441             passman.add_password(None, self.url, self.user, self.password)
442             opener.add_handler(urllib2.HTTPBasicAuthHandler(passman))
443
444         urllib2.install_opener(opener)
445
446     def _get_raw_data(self):
447         """
448         Get raw data from http request
449         :return: str
450         """
451         raw = None
452         try:
453             f = urllib2.urlopen(self.url, timeout=self.update_every * 2)
454         except Exception as e:
455             self.error(str(e))
456             return None
457
458         try:
459             raw = f.read().decode('utf-8')
460         except Exception as e:
461             self.error(str(e))
462         finally:
463             f.close()
464         return raw
465
466     def check(self):
467         """
468         Format configuration data and try to connect to server
469         :return: boolean
470         """
471         if self.name is None or self.name == str(None):
472             self.name = 'local'
473             self.chart_name += "_" + self.name
474         else:
475             self.name = str(self.name)
476         try:
477             self.url = str(self.configuration['url'])
478         except (KeyError, TypeError):
479             pass
480         try:
481             self.user = str(self.configuration['user'])
482         except (KeyError, TypeError):
483             pass
484         try:
485             self.password = str(self.configuration['pass'])
486         except (KeyError, TypeError):
487             pass
488
489         self.__add_openers()
490
491         if self._get_data() is None or len(self._get_data()) == 0:
492             return False
493         else:
494             return True
495
496
497 class SocketService(SimpleService):
498     def __init__(self, configuration=None, name=None):
499         self._sock = None
500         self._keep_alive = False
501         self.host = "localhost"
502         self.port = None
503         self.unix_socket = None
504         self.request = ""
505         self.__socket_config = None
506         SimpleService.__init__(self, configuration=configuration, name=name)
507
508     def _connect(self):
509         """
510         Recreate socket and connect to it since sockets cannot be reused after closing
511         Available configurations are IPv6, IPv4 or UNIX socket
512         :return:
513         """
514         try:
515             if self.unix_socket is None:
516                 if self.__socket_config is None:
517                     # establish ipv6 or ipv4 connection.
518                     for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
519                         try:
520                             # noinspection SpellCheckingInspection
521                             af, socktype, proto, canonname, sa = res
522                             self._sock = socket.socket(af, socktype, proto)
523                         except socket.error as e:
524                             self.debug("Cannot create socket:", str(e))
525                             self._sock = None
526                             continue
527                         try:
528                             self._sock.connect(sa)
529                         except socket.error as e:
530                             self.debug("Cannot connect to socket:", str(e))
531                             self._disconnect()
532                             continue
533                         self.__socket_config = res
534                         break
535                 else:
536                     # connect to socket with previously established configuration
537                     try:
538                         af, socktype, proto, canonname, sa = self.__socket_config
539                         self._sock = socket.socket(af, socktype, proto)
540                         self._sock.connect(sa)
541                     except socket.error as e:
542                         self.debug("Cannot create or connect to socket:", str(e))
543                         self._disconnect()
544             else:
545                 # connect to unix socket
546                 try:
547                     self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
548                     self._sock.connect(self.unix_socket)
549                 except socket.error:
550                     self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
551                     self._sock.connect(self.unix_socket)
552
553         except Exception as e:
554             self.error(str(e),
555                        "Cannot create socket with following configuration: host:", str(self.host),
556                        "port:", str(self.port),
557                        "socket:", str(self.unix_socket))
558             self._sock = None
559         self._sock.setblocking(0)
560
561     def _disconnect(self):
562         """
563         Close socket connection
564         :return:
565         """
566         try:
567             self._sock.shutdown(2)  # 0 - read, 1 - write, 2 - all
568             self._sock.close()
569         except Exception:
570             pass
571         self._sock = None
572
573     def _send(self):
574         """
575         Send request.
576         :return: boolean
577         """
578         # Send request if it is needed
579         if self.request != "".encode():
580             try:
581                 self._sock.send(self.request)
582             except Exception as e:
583                 self._disconnect()
584                 self.error(str(e),
585                            "used configuration: host:", str(self.host),
586                            "port:", str(self.port),
587                            "socket:", str(self.unix_socket))
588                 return False
589         return True
590
591     def _receive(self):
592         """
593         Receive data from socket
594         :return: str
595         """
596         data = ""
597         while True:
598             try:
599                 ready_to_read, _, in_error = select.select([self._sock], [], [], 15)
600             except Exception as e:
601                 self.debug("SELECT", str(e))
602                 self._disconnect()
603                 break
604             if len(ready_to_read) > 0:
605                 buf = self._sock.recv(4096)
606                 if len(buf) == 0 or buf is None:  # handle server disconnect
607                     break
608                 data += buf.decode()
609                 if self._check_raw_data(data):
610                     break
611             else:
612                 self.error("Socket timed out.")
613                 self._disconnect()
614                 break
615
616         return data
617
618     def _get_raw_data(self):
619         """
620         Get raw data with low-level "socket" module.
621         :return: str
622         """
623         if self._sock is None:
624             self._connect()
625
626         # Send request if it is needed
627         if not self._send():
628             return None
629
630         data = self._receive()
631
632         if not self._keep_alive:
633             self._disconnect()
634
635         return data
636
637     def _check_raw_data(self, data):
638         """
639         Check if all data has been gathered from socket
640         :param data: str
641         :return: boolean
642         """
643         return True
644
645     def _parse_config(self):
646         """
647         Parse configuration data
648         :return: boolean
649         """
650         if self.name is None or self.name == str(None):
651             self.name = ""
652         else:
653             self.name = str(self.name)
654         try:
655             self.unix_socket = str(self.configuration['socket'])
656         except (KeyError, TypeError):
657             self.debug("No unix socket specified. Trying TCP/IP socket.")
658             try:
659                 self.host = str(self.configuration['host'])
660             except (KeyError, TypeError):
661                 self.debug("No host specified. Using: '" + self.host + "'")
662             try:
663                 self.port = int(self.configuration['port'])
664             except (KeyError, TypeError):
665                 self.debug("No port specified. Using: '" + str(self.port) + "'")
666         try:
667             self.request = str(self.configuration['request'])
668         except (KeyError, TypeError):
669             self.debug("No request specified. Using: '" + str(self.request) + "'")
670         self.request = self.request.encode()
671
672
673 class LogService(SimpleService):
674     def __init__(self, configuration=None, name=None):
675         self.log_path = ""
676         self._last_position = 0
677         # self._log_reader = None
678         SimpleService.__init__(self, configuration=configuration, name=name)
679         self.retries = 100000  # basically always retry
680
681     def _get_raw_data(self):
682         """
683         Get log lines since last poll
684         :return: list
685         """
686         lines = []
687         try:
688             if os.path.getsize(self.log_path) < self._last_position:
689                 self._last_position = 0  # read from beginning if file has shrunk
690             elif os.path.getsize(self.log_path) == self._last_position:
691                 self.debug("Log file hasn't changed. No new data.")
692                 return []  # return empty list if nothing has changed
693             with open(self.log_path, "r") as fp:
694                 fp.seek(self._last_position)
695                 for i, line in enumerate(fp):
696                     lines.append(line)
697                 self._last_position = fp.tell()
698         except Exception as e:
699             self.error(str(e))
700
701         if len(lines) != 0:
702             return lines
703         else:
704             self.error("No data collected.")
705             return None
706
707     def check(self):
708         """
709         Parse basic configuration and check if log file exists
710         :return: boolean
711         """
712         if self.name is not None or self.name != str(None):
713             self.name = ""
714         else:
715             self.name = str(self.name)
716         try:
717             self.log_path = str(self.configuration['path'])
718         except (KeyError, TypeError):
719             self.error("No path to log specified. Using: '" + self.log_path + "'")
720
721         if os.access(self.log_path, os.R_OK):
722             return True
723         else:
724             self.error("Cannot access file: '" + self.log_path + "'")
725             return False
726
727     def create(self):
728         # set cursor at last byte of log file
729         self._last_position = os.path.getsize(self.log_path)
730         status = SimpleService.create(self)
731         # self._last_position = 0
732         return status
733
734
735 class ExecutableService(SimpleService):
736     bad_substrings = ('&', '|', ';', '>', '<')
737
738     def __init__(self, configuration=None, name=None):
739         self.command = ""
740         SimpleService.__init__(self, configuration=configuration, name=name)
741
742     def _get_raw_data(self):
743         """
744         Get raw data from executed command
745         :return: str
746         """
747         try:
748             p = Popen(self.command, stdout=PIPE, stderr=PIPE)
749         except Exception as e:
750             self.error("Executing command", self.command, "resulted in error:", str(e))
751             return None
752         data = []
753         for line in p.stdout.readlines():
754             data.append(str(line.decode()))
755
756         if len(data) == 0:
757             self.error("No data collected.")
758             return None
759
760         return data
761
762     def check(self):
763         """
764         Parse basic configuration, check if command is whitelisted and is returning values
765         :return: boolean
766         """
767         if self.name is not None or self.name != str(None):
768             self.name = ""
769         else:
770             self.name = str(self.name)
771         try:
772             self.command = str(self.configuration['command'])
773         except (KeyError, TypeError):
774             self.error("No command specified. Using: '" + self.command + "'")
775         self.command = self.command.split(' ')
776
777         for arg in self.command[1:]:
778             if any(st in arg for st in self.bad_substrings):
779                 self.error("Bad command argument:" + " ".join(self.command[1:]))
780                 return False
781         # test command and search for it in /usr/sbin or /sbin when failed
782         base = self.command[0].split('/')[-1]
783         if self._get_raw_data() is None:
784             for prefix in ['/sbin/', '/usr/sbin/']:
785                 self.command[0] = prefix + base
786                 if os.path.isfile(self.command[0]):
787                     break
788
789         if self._get_data() is None or len(self._get_data()) == 0:
790             self.error("Command", self.command, "returned no data")
791             return False
792         return True