]> arthur.barton.de Git - netdata.git/blob - python.d/python_modules/base.py
attempt to fix freezes on socket service
[netdata.git] / python.d / python_modules / base.py
1 # -*- coding: utf-8 -*-
2 # Description: netdata python modules framework
3 # Author: Pawel Krupa (paulfantom)
4
5 # Remember:
6 # ALL CODE NEEDS TO BE COMPATIBLE WITH Python > 2.7 and Python > 3.1
7 # Follow PEP8 as much as it is possible
8 # "check" and "create" CANNOT be blocking.
9 # "update" CAN be blocking
10 # "update" function needs to be fast, so follow:
11 #   https://wiki.python.org/moin/PythonSpeed/PerformanceTips
12 # basically:
13 #  - use local variables wherever it is possible
14 #  - avoid dots in expressions that are executed many times
15 #  - use "join()" instead of "+"
16 #  - use "import" only at the beginning
17 #
18 # using ".encode()" in one thread can block other threads as well (only in python2)
19
20 import time
21 # import sys
22 import os
23 import socket
24 import select
25 try:
26     import urllib.request as urllib2
27 except ImportError:
28     import urllib2
29
30 from subprocess import Popen, PIPE
31
32 import threading
33 import msg
34
35
36 # class BaseService(threading.Thread):
37 class SimpleService(threading.Thread):
38     """
39     Prototype of Service class.
40     Implemented basic functionality to run jobs by `python.d.plugin`
41     """
42     def __init__(self, configuration=None, name=None):
43         """
44         This needs to be initialized in child classes
45         :param configuration: dict
46         :param name: str
47         """
48         threading.Thread.__init__(self)
49         self._data_stream = ""
50         self.daemon = True
51         self.retries = 0
52         self.retries_left = 0
53         self.priority = 140000
54         self.update_every = 1
55         self.name = name
56         self.override_name = None
57         self.chart_name = ""
58         self._dimensions = []
59         self._charts = []
60         self.__chart_set = False
61         self.__first_run = True
62         self.order = []
63         self.definitions = {}
64         if configuration is None:
65             self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
66             raise RuntimeError
67         else:
68             self._extract_base_config(configuration)
69             self.timetable = {}
70             self.create_timetable()
71
72     # --- BASIC SERVICE CONFIGURATION ---
73
74     def _extract_base_config(self, config):
75         """
76         Get basic parameters to run service
77         Minimum config:
78             config = {'update_every':1,
79                       'priority':100000,
80                       'retries':0}
81         :param config: dict
82         """
83         pop = config.pop
84         try:
85             self.override_name = pop('name')
86         except KeyError:
87             pass
88         self.update_every = int(pop('update_every'))
89         self.priority = int(pop('priority'))
90         self.retries = int(pop('retries'))
91         self.retries_left = self.retries
92         self.configuration = config
93
94     def create_timetable(self, freq=None):
95         """
96         Create service timetable.
97         `freq` is optional
98         Example:
99             timetable = {'last': 1466370091.3767564,
100                          'next': 1466370092,
101                          'freq': 1}
102         :param freq: int
103         """
104         if freq is None:
105             freq = self.update_every
106         now = time.time()
107         self.timetable = {'last': now,
108                           'next': now - (now % freq) + freq,
109                           'freq': freq}
110
111     # --- THREAD CONFIGURATION ---
112
113     def _run_once(self):
114         """
115         Executes self.update(interval) and draws run time chart.
116         Return value presents exit status of update()
117         :return: boolean
118         """
119         t_start = time.time()
120         timetable = self.timetable
121         chart_name = self.chart_name
122         # check if it is time to execute job update() function
123         if timetable['next'] > t_start:
124             self.debug(chart_name, "will be run in", str(int((timetable['next'] - t_start) * 1000)), "ms")
125             return True
126
127         since_last = int((t_start - timetable['last']) * 1000000)
128         self.debug(chart_name,
129                    "ready to run, after", str(int((t_start - timetable['last']) * 1000)),
130                    "ms (update_every:", str(timetable['freq'] * 1000),
131                    "ms, latency:", str(int((t_start - timetable['next']) * 1000)), "ms")
132         if self.__first_run:
133             since_last = 0
134         if not self.update(since_last):
135             self.error("update function failed.")
136             return False
137         t_end = time.time()
138         self.timetable['next'] = t_end - (t_end % timetable['freq']) + timetable['freq']
139         # draw performance graph
140         run_time = str(int((t_end - t_start) * 1000))
141         # noinspection SqlNoDataSourceInspection
142         print("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" %
143               (self.chart_name, str(since_last), run_time))
144         # sys.stdout.write("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" %
145         #                  (self.chart_name, str(since_last), run_time))
146
147         self.debug(chart_name, "updated in", str(run_time), "ms")
148         self.timetable['last'] = t_start
149         self.__first_run = False
150         return True
151
152     def run(self):
153         """
154         Runs job in thread. Handles retries.
155         Exits when job failed or timed out.
156         :return: None
157         """
158         self.timetable['last'] = time.time()
159         while True:  # run forever, unless something is wrong
160             try:
161                 status = self._run_once()
162             except Exception as e:
163                 self.error("Something wrong: ", str(e))
164                 return
165             if status:  # handle retries if update failed
166                 time.sleep(self.timetable['next'] - time.time())
167                 self.retries_left = self.retries
168             else:
169                 self.retries_left -= 1
170                 if self.retries_left <= 0:
171                     self.error("no more retries. Exiting")
172                     return
173                 else:
174                     time.sleep(self.timetable['freq'])
175
176     # --- CHART ---
177
178     @staticmethod
179     def _format(*args):
180         """
181         Escape and convert passed arguments.
182         :param args: anything
183         :return: list
184         """
185         params = []
186         append = params.append
187         for p in args:
188             if p is None:
189                 append(p)
190                 continue
191             if type(p) is not str:
192                 p = str(p)
193             if ' ' in p:
194                 p = "'" + p + "'"
195             append(p)
196         return params
197
198     def _line(self, instruction, *params):
199         """
200         Converts *params to string and joins them with one space between every one.
201         Result is appended to self._data_stream
202         :param params: str/int/float
203         """
204         tmp = list(map((lambda x: "''" if x is None or len(x) == 0 else x), params))
205         self._data_stream += "%s %s\n" % (instruction, str(" ".join(tmp)))
206
207     def chart(self, type_id, name="", title="", units="", family="",
208               category="", chart_type="line", priority="", update_every=""):
209         """
210         Defines a new chart.
211         :param type_id: str
212         :param name: str
213         :param title: str
214         :param units: str
215         :param family: str
216         :param category: str
217         :param chart_type: str
218         :param priority: int/str
219         :param update_every: int/str
220         """
221         self._charts.append(type_id)
222
223         p = self._format(type_id, name, title, units, family, category, chart_type, priority, update_every)
224         self._line("CHART", *p)
225
226     def dimension(self, id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False):
227         """
228         Defines a new dimension for the chart
229         :param id: str
230         :param name: str
231         :param algorithm: str
232         :param multiplier: int/str
233         :param divisor: int/str
234         :param hidden: boolean
235         :return:
236         """
237         try:
238             int(multiplier)
239         except TypeError:
240             self.error("malformed dimension: multiplier is not a number:", multiplier)
241             multiplier = 1
242         try:
243             int(divisor)
244         except TypeError:
245             self.error("malformed dimension: divisor is not a number:", divisor)
246             divisor = 1
247         if name is None:
248             name = id
249         if algorithm not in ("absolute", "incremental", "percentage-of-absolute-row", "percentage-of-incremental-row"):
250             algorithm = "absolute"
251
252         self._dimensions.append(str(id))
253         if hidden:
254             p = self._format(id, name, algorithm, multiplier, divisor, "hidden")
255         else:
256             p = self._format(id, name, algorithm, multiplier, divisor)
257
258         self._line("DIMENSION", *p)
259
260     def begin(self, type_id, microseconds=0):
261         """
262         Begin data set
263         :param type_id: str
264         :param microseconds: int
265         :return: boolean
266         """
267         if type_id not in self._charts:
268             self.error("wrong chart type_id:", type_id)
269             return False
270         try:
271             int(microseconds)
272         except TypeError:
273             self.error("malformed begin statement: microseconds are not a number:", microseconds)
274             microseconds = ""
275
276         self._line("BEGIN", type_id, str(microseconds))
277         return True
278
279     def set(self, id, value):
280         """
281         Set value to dimension
282         :param id: str
283         :param value: int/float
284         :return: boolean
285         """
286         if id not in self._dimensions:
287             self.error("wrong dimension id:", id, "Available dimensions are:", *self._dimensions)
288             return False
289         try:
290             value = str(int(value))
291         except TypeError:
292             self.error("cannot set non-numeric value:", str(value))
293             return False
294         self._line("SET", id, "=", str(value))
295         self.__chart_set = True
296         return True
297
298     def end(self):
299         if self.__chart_set:
300             self._line("END")
301             self.__chart_set = False
302         else:
303             pos = self._data_stream.rfind("BEGIN")
304             self._data_stream = self._data_stream[:pos]
305
306     def commit(self):
307         """
308         Upload new data to netdata.
309         """
310         print(self._data_stream)
311         self._data_stream = ""
312
313     # --- ERROR HANDLING ---
314
315     def error(self, *params):
316         """
317         Show error message on stderr
318         """
319         msg.error(self.chart_name, *params)
320
321     def debug(self, *params):
322         """
323         Show debug message on stderr
324         """
325         msg.debug(self.chart_name, *params)
326
327     def info(self, *params):
328         """
329         Show information message on stderr
330         """
331         msg.info(self.chart_name, *params)
332
333     # --- MAIN METHODS ---
334
335     def _get_data(self):
336         """
337         Get some data
338         :return: dict
339         """
340         return {}
341
342     def check(self):
343         """
344         check() prototype
345         :return: boolean
346         """
347         self.debug("Module", str(self.__module__), "doesn't implement check() function. Using default.")
348         if self._get_data() is None or len(self._get_data()) == 0:
349             return False
350         else:
351             return True
352
353     def create(self):
354         """
355         Create charts
356         :return: boolean
357         """
358         data = self._get_data()
359         if data is None:
360             return False
361
362         idx = 0
363         for name in self.order:
364             options = self.definitions[name]['options'] + [self.priority + idx, self.update_every]
365             self.chart(self.chart_name + "." + name, *options)
366             # check if server has this datapoint
367             for line in self.definitions[name]['lines']:
368                 if line[0] in data:
369                     self.dimension(*line)
370             idx += 1
371
372         self.commit()
373         return True
374
375     def update(self, interval):
376         """
377         Update charts
378         :param interval: int
379         :return: boolean
380         """
381         data = self._get_data()
382         if data is None:
383             self.debug("_get_data() returned no data")
384             return False
385
386         updated = False
387         for chart in self.order:
388             if self.begin(self.chart_name + "." + chart, interval):
389                 updated = True
390                 for dim in self.definitions[chart]['lines']:
391                     try:
392                         self.set(dim[0], data[dim[0]])
393                     except KeyError:
394                         pass
395                 self.end()
396
397         self.commit()
398         if not updated:
399             self.error("no charts to update")
400
401         return updated
402
403
404 class UrlService(SimpleService):
405     # TODO add support for https connections
406     def __init__(self, configuration=None, name=None):
407         self.url = ""
408         self.user = None
409         self.password = None
410         self.proxies = {}
411         SimpleService.__init__(self, configuration=configuration, name=name)
412
413     def __add_openers(self):
414         # TODO add error handling
415         self.opener = urllib2.build_opener()
416
417         # Proxy handling
418         # TODO currently self.proxies isn't parsed from configuration file
419         # if len(self.proxies) > 0:
420         #     for proxy in self.proxies:
421         #         url = proxy['url']
422         #         # TODO test this:
423         #         if "user" in proxy and "pass" in proxy:
424         #             if url.lower().startswith('https://'):
425         #                 url = 'https://' + proxy['user'] + ':' + proxy['pass'] + '@' + url[8:]
426         #             else:
427         #                 url = 'http://' + proxy['user'] + ':' + proxy['pass'] + '@' + url[7:]
428         #         # FIXME move proxy auth to sth like this:
429         #         #     passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
430         #         #     passman.add_password(None, url, proxy['user'], proxy['password'])
431         #         #     opener.add_handler(urllib2.HTTPBasicAuthHandler(passman))
432         #
433         #         if url.lower().startswith('https://'):
434         #             opener.add_handler(urllib2.ProxyHandler({'https': url}))
435         #         else:
436         #             opener.add_handler(urllib2.ProxyHandler({'https': url}))
437
438         # HTTP Basic Auth
439         if self.user is not None and self.password is not None:
440             passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
441             passman.add_password(None, self.url, self.user, self.password)
442             self.opener.add_handler(urllib2.HTTPBasicAuthHandler(passman))
443             self.debug("Enabling HTTP basic auth")
444
445         #urllib2.install_opener(opener)
446
447     def _get_raw_data(self):
448         """
449         Get raw data from http request
450         :return: str
451         """
452         raw = None
453         try:
454             f = self.opener.open(self.url, timeout=self.update_every * 2)
455             # f = urllib2.urlopen(self.url, timeout=self.update_every * 2)
456         except Exception as e:
457             self.error(str(e))
458             return None
459
460         try:
461             raw = f.read().decode('utf-8')
462         except Exception as e:
463             self.error(str(e))
464         finally:
465             f.close()
466         return raw
467
468     def check(self):
469         """
470         Format configuration data and try to connect to server
471         :return: boolean
472         """
473         if self.name is None or self.name == str(None):
474             self.name = 'local'
475             self.chart_name += "_" + self.name
476         else:
477             self.name = str(self.name)
478         try:
479             self.url = str(self.configuration['url'])
480         except (KeyError, TypeError):
481             pass
482         try:
483             self.user = str(self.configuration['user'])
484         except (KeyError, TypeError):
485             pass
486         try:
487             self.password = str(self.configuration['pass'])
488         except (KeyError, TypeError):
489             pass
490
491         self.__add_openers()
492
493         test = self._get_data()
494         if test is None or len(test) == 0:
495             return False
496         else:
497             return True
498
499
500 class SocketService(SimpleService):
501     def __init__(self, configuration=None, name=None):
502         self._sock = None
503         self._keep_alive = False
504         self.host = "localhost"
505         self.port = None
506         self.unix_socket = None
507         self.request = ""
508         self.__socket_config = None
509         SimpleService.__init__(self, configuration=configuration, name=name)
510
511     def _connect(self):
512         """
513         Recreate socket and connect to it since sockets cannot be reused after closing
514         Available configurations are IPv6, IPv4 or UNIX socket
515         :return:
516         """
517         try:
518             if self.unix_socket is None:
519                 if self.__socket_config is None:
520                     # establish ipv6 or ipv4 connection.
521                     for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
522                         self.debug("connecting socket to host '" + str(self.host) + "', port " + str(self.port))
523                         try:
524                             # noinspection SpellCheckingInspection
525                             af, socktype, proto, canonname, sa = res
526                             self._sock = socket.socket(af, socktype, proto)
527                         except socket.error as e:
528                             self.debug("Cannot create socket:", str(e))
529                             self._sock = None
530                             continue
531                         try:
532                             self._sock.connect(sa)
533                         except socket.error as e:
534                             self.debug("Cannot connect to socket:", str(e))
535                             self._disconnect()
536                             continue
537                         self.__socket_config = res
538                         break
539                 else:
540                     # connect to socket with previously established configuration
541                     try:
542                         af, socktype, proto, canonname, sa = self.__socket_config
543                         self._sock = socket.socket(af, socktype, proto)
544                         self._sock.connect(sa)
545                     except socket.error as e:
546                         self.debug("Cannot create or connect to socket:", str(e))
547                         self._disconnect()
548             else:
549                 # connect to unix socket
550                 try:
551                     self.debug("connecting unix socket '" + str(self.unix_socket) + "'")
552                     self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
553                     self._sock.connect(self.unix_socket)
554                 except socket.error:
555                     self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
556                     self._sock.connect(self.unix_socket)
557
558         except Exception as e:
559             self.error(str(e),
560                        "Cannot create socket with following configuration: host:", str(self.host),
561                        "port:", str(self.port),
562                        "socket:", str(self.unix_socket))
563             self._sock = None
564
565         if self._sock is not None:
566             self._sock.setblocking(0)
567             self._sock.settimeout(5)
568             self.debug("connected with timeout " + str(self._sock.gettimeout()))
569         else:
570             self.debug("not connected")
571
572     def _disconnect(self):
573         """
574         Close socket connection
575         :return:
576         """
577         if self._sock is not None:
578             try:
579                 self.debug("disconnecting")
580                 self._sock.shutdown(2)  # 0 - read, 1 - write, 2 - all
581                 self._sock.close()
582             except Exception:
583                 pass
584             self._sock = None
585
586     def _send(self):
587         """
588         Send request.
589         :return: boolean
590         """
591         # Send request if it is needed
592         if self.request != "".encode():
593             try:
594                 self.debug("sending request")
595                 self._sock.send(self.request)
596             except Exception as e:
597                 self._disconnect()
598                 self.error(str(e),
599                            "used configuration: host:", str(self.host),
600                            "port:", str(self.port),
601                            "socket:", str(self.unix_socket))
602                 return False
603         return True
604
605     def _receive(self):
606         """
607         Receive data from socket
608         :return: str
609         """
610         data = ""
611         while True:
612             try:
613                 self.debug("receiving response")
614                 ready_to_read, _, in_error = select.select([self._sock], [], [], 5)
615             except Exception as e:
616                 self.error("timeout while waiting for response:", str(e))
617                 self._disconnect()
618                 break
619             if len(ready_to_read) > 0:
620                 buf = self._sock.recv(4096)
621                 if buf is None or len(buf) == 0:  # handle server disconnect
622                     self._disconnect()
623                     break
624                 data += buf.decode(errors='ignore')
625                 if self._check_raw_data(data):
626                     break
627             else:
628                 self.error("Socket timed out.")
629                 self._disconnect()
630                 break
631
632         return data
633
634     def _get_raw_data(self):
635         """
636         Get raw data with low-level "socket" module.
637         :return: str
638         """
639         if self._sock is None:
640             self._connect()
641
642         # Send request if it is needed
643         if not self._send():
644             return None
645
646         data = self._receive()
647
648         if not self._keep_alive:
649             self._disconnect()
650
651         return data
652
653     def _check_raw_data(self, data):
654         """
655         Check if all data has been gathered from socket
656         :param data: str
657         :return: boolean
658         """
659         return True
660
661     def _parse_config(self):
662         """
663         Parse configuration data
664         :return: boolean
665         """
666         if self.name is None or self.name == str(None):
667             self.name = ""
668         else:
669             self.name = str(self.name)
670         try:
671             self.unix_socket = str(self.configuration['socket'])
672         except (KeyError, TypeError):
673             self.debug("No unix socket specified. Trying TCP/IP socket.")
674             self.unix_socket = None
675             try:
676                 self.host = str(self.configuration['host'])
677             except (KeyError, TypeError):
678                 self.debug("No host specified. Using: '" + self.host + "'")
679             try:
680                 self.port = int(self.configuration['port'])
681             except (KeyError, TypeError):
682                 self.debug("No port specified. Using: '" + str(self.port) + "'")
683         try:
684             self.request = str(self.configuration['request'])
685         except (KeyError, TypeError):
686             self.debug("No request specified. Using: '" + str(self.request) + "'")
687         self.request = self.request.encode()
688
689     def check(self):
690         self._parse_config()
691         return SimpleService.check(self)
692
693
694 class LogService(SimpleService):
695     def __init__(self, configuration=None, name=None):
696         self.log_path = ""
697         self._last_position = 0
698         # self._log_reader = None
699         SimpleService.__init__(self, configuration=configuration, name=name)
700         self.retries = 100000  # basically always retry
701
702     def _get_raw_data(self):
703         """
704         Get log lines since last poll
705         :return: list
706         """
707         lines = []
708         try:
709             if os.path.getsize(self.log_path) < self._last_position:
710                 self._last_position = 0  # read from beginning if file has shrunk
711             elif os.path.getsize(self.log_path) == self._last_position:
712                 self.debug("Log file hasn't changed. No new data.")
713                 return []  # return empty list if nothing has changed
714             with open(self.log_path, "r") as fp:
715                 fp.seek(self._last_position)
716                 for i, line in enumerate(fp):
717                     lines.append(line)
718                 self._last_position = fp.tell()
719         except Exception as e:
720             self.error(str(e))
721
722         if len(lines) != 0:
723             return lines
724         else:
725             self.error("No data collected.")
726             return None
727
728     def check(self):
729         """
730         Parse basic configuration and check if log file exists
731         :return: boolean
732         """
733         if self.name is not None or self.name != str(None):
734             self.name = ""
735         else:
736             self.name = str(self.name)
737         try:
738             self.log_path = str(self.configuration['path'])
739         except (KeyError, TypeError):
740             self.error("No path to log specified. Using: '" + self.log_path + "'")
741
742         if os.access(self.log_path, os.R_OK):
743             return True
744         else:
745             self.error("Cannot access file: '" + self.log_path + "'")
746             return False
747
748     def create(self):
749         # set cursor at last byte of log file
750         self._last_position = os.path.getsize(self.log_path)
751         status = SimpleService.create(self)
752         # self._last_position = 0
753         return status
754
755
756 class ExecutableService(SimpleService):
757     bad_substrings = ('&', '|', ';', '>', '<')
758
759     def __init__(self, configuration=None, name=None):
760         self.command = ""
761         SimpleService.__init__(self, configuration=configuration, name=name)
762
763     def _get_raw_data(self):
764         """
765         Get raw data from executed command
766         :return: str
767         """
768         try:
769             p = Popen(self.command, stdout=PIPE, stderr=PIPE)
770         except Exception as e:
771             self.error("Executing command", self.command, "resulted in error:", str(e))
772             return None
773         data = []
774         for line in p.stdout.readlines():
775             data.append(str(line.decode()))
776
777         if len(data) == 0:
778             self.error("No data collected.")
779             return None
780
781         return data
782
783     def check(self):
784         """
785         Parse basic configuration, check if command is whitelisted and is returning values
786         :return: boolean
787         """
788         if self.name is not None or self.name != str(None):
789             self.name = ""
790         else:
791             self.name = str(self.name)
792         try:
793             self.command = str(self.configuration['command'])
794         except (KeyError, TypeError):
795             self.error("No command specified. Using: '" + self.command + "'")
796         command = self.command.split(' ')
797
798         for arg in command[1:]:
799             if any(st in arg for st in self.bad_substrings):
800                 self.error("Bad command argument:" + " ".join(self.command[1:]))
801                 return False
802         # test command and search for it in /usr/sbin or /sbin when failed
803         base = command[0].split('/')[-1]
804         if self._get_raw_data() is None:
805             for prefix in ['/sbin/', '/usr/sbin/']:
806                 command[0] = prefix + base
807                 if os.path.isfile(command[0]):
808                     break
809         self.command = command
810         if self._get_data() is None or len(self._get_data()) == 0:
811             self.error("Command", self.command, "returned no data")
812             return False
813         return True