]> arthur.barton.de Git - netdata.git/blob - python.d/python_modules/base.py
do not update chart if no SET found
[netdata.git] / python.d / python_modules / base.py
1 # -*- coding: utf-8 -*-
2 # Description: prototypes for netdata python.d modules
3 # Author: Pawel Krupa (paulfantom)
4
5 import time
6 import sys
7 import os
8 import socket
9 import select
10 try:
11     import urllib.request as urllib2
12 except ImportError:
13     import urllib2
14
15 from subprocess import Popen, PIPE
16
17 import threading
18 import msg
19
20
21 class BaseService(threading.Thread):
22     """
23     Prototype of Service class.
24     Implemented basic functionality to run jobs by `python.d.plugin`
25     """
26     def __init__(self, configuration=None, name=None):
27         """
28         This needs to be initialized in child classes
29         :param configuration: dict
30         :param name: str
31         """
32         threading.Thread.__init__(self)
33         self._data_stream = ""
34         self.daemon = True
35         self.retries = 0
36         self.retries_left = 0
37         self.priority = 140000
38         self.update_every = 1
39         self.name = name
40         self.override_name = None
41         self.chart_name = ""
42         self._dimensions = []
43         self._charts = []
44         self.__chart_set = False
45         if configuration is None:
46             self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
47             raise RuntimeError
48         else:
49             self._extract_base_config(configuration)
50             self.timetable = {}
51             self.create_timetable()
52
53     def _extract_base_config(self, config):
54         """
55         Get basic parameters to run service
56         Minimum config:
57             config = {'update_every':1,
58                       'priority':100000,
59                       'retries':0}
60         :param config: dict
61         """
62         try:
63             self.override_name = config.pop('name')
64         except KeyError:
65             pass
66         self.update_every = int(config.pop('update_every'))
67         self.priority = int(config.pop('priority'))
68         self.retries = int(config.pop('retries'))
69         self.retries_left = self.retries
70         self.configuration = config
71
72     def create_timetable(self, freq=None):
73         """
74         Create service timetable.
75         `freq` is optional
76         Example:
77             timetable = {'last': 1466370091.3767564,
78                          'next': 1466370092,
79                          'freq': 1}
80         :param freq: int
81         """
82         if freq is None:
83             freq = self.update_every
84         now = time.time()
85         self.timetable = {'last': now,
86                           'next': now - (now % freq) + freq,
87                           'freq': freq}
88
89     def _run_once(self):
90         """
91         Executes self.update(interval) and draws run time chart.
92         Return value presents exit status of update()
93         :return: boolean
94         """
95         t_start = time.time()
96         # check if it is time to execute job update() function
97         if self.timetable['next'] > t_start:
98             #msg.debug(self.chart_name + " will be run in " +
99             #          str(int((self.timetable['next'] - t_start) * 1000)) + " ms")
100             msg.debug(self.chart_name, "will be run in", str(int((self.timetable['next'] - t_start) * 1000)), "ms")
101             return True
102
103         since_last = int((t_start - self.timetable['last']) * 1000000)
104         #msg.debug(self.chart_name +
105         #          " ready to run, after " + str(int((t_start - self.timetable['last']) * 1000)) +
106         #          " ms (update_every: " + str(self.timetable['freq'] * 1000) +
107         #          " ms, latency: " + str(int((t_start - self.timetable['next']) * 1000)) + " ms)")
108         msg.debug(self.chart_name,
109                   "ready to run, after", str(int((t_start - self.timetable['last']) * 1000)),
110                   "ms (update_every:", str(self.timetable['freq'] * 1000),
111                   "ms, latency:", str(int((t_start - self.timetable['next']) * 1000)), "ms")
112         if not self.update(since_last):
113             self.error("update function failed.")
114             return False
115         t_end = time.time()
116         self.timetable['next'] = t_end - (t_end % self.timetable['freq']) + self.timetable['freq']
117         # draw performance graph
118         run_time = str(int((t_end - t_start) * 1000))
119         #run_time_chart = "BEGIN netdata.plugin_pythond_" + self.chart_name + " " + str(since_last) + '\n'
120         #run_time_chart += "SET run_time = " + run_time + '\n'
121         #run_time_chart += "END\n"
122         #sys.stdout.write(run_time_chart)
123         sys.stdout.write("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" % \
124                          (self.chart_name, str(since_last), run_time))
125
126         #msg.debug(self.chart_name + " updated in " + str(run_time) + " ms")
127         msg.debug(self.chart_name, "updated in", str(run_time), "ms")
128         self.timetable['last'] = t_start
129         return True
130
131     def run(self):
132         """
133         Runs job in thread. Handles retries.
134         Exits when job failed or timed out.
135         :return: None
136         """
137         self.timetable['last'] = time.time()
138         while True:
139             try:
140                 status = self._run_once()
141             except Exception as e:
142                 msg.error("Something wrong: ", str(e))
143                 return
144             if status:
145                 time.sleep(self.timetable['next'] - time.time())
146                 self.retries_left = self.retries
147             else:
148                 self.retries_left -= 1
149                 if self.retries_left <= 0:
150                     msg.error("no more retries. Exiting")
151                     return
152                 else:
153                     time.sleep(self.timetable['freq'])
154
155     @staticmethod
156     def _format(*args):
157         params = []
158         append = params.append
159         for p in args:
160             if p is None:
161                 append(p)
162                 continue
163             if type(p) is not str:
164                 p = str(p)
165             if ' ' in p:
166                 p = "'" + p + "'"
167             append(p)
168         return params
169
170     def _line(self, instruction, *params):
171         """
172         Converts *params to string and joins them with one space between every one.
173         :param params: str/int/float
174         """
175         #self._data_stream += instruction
176         tmp = list(map((lambda x: "''" if x is None or len(x) == 0 else x), params))
177
178         self._data_stream += "%s %s\n" % (instruction, str(" ".join(tmp)))
179
180         # self.error(str(" ".join(tmp)))
181         # for p in params:
182         #     if p is None:
183         #         p = ""
184         #     else:
185         #         p = str(p)
186         #     if len(p) == 0:
187         #         p = "''"
188         #     if ' ' in p:
189         #         p = "'" + p + "'"
190         #     self._data_stream += " " + p
191         #self._data_stream += "\n"
192
193     def chart(self, type_id, name="", title="", units="", family="",
194               category="", charttype="line", priority="", update_every=""):
195         """
196         Defines a new chart.
197         :param type_id: str
198         :param name: str
199         :param title: str
200         :param units: str
201         :param family: str
202         :param category: str
203         :param charttype: str
204         :param priority: int/str
205         :param update_every: int/str
206         """
207         self._charts.append(type_id)
208         #self._line("CHART", type_id, name, title, units, family, category, charttype, priority, update_every)
209
210         p = self._format(type_id, name, title, units, family, category, charttype, priority, update_every)
211         self._line("CHART", *p)
212
213     def dimension(self, id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False):
214         """
215         Defines a new dimension for the chart
216         :param id: str
217         :param name: str
218         :param algorithm: str
219         :param multiplier: int/str
220         :param divisor: int/str
221         :param hidden: boolean
222         :return:
223         """
224         try:
225             int(multiplier)
226         except TypeError:
227             self.error("malformed dimension: multiplier is not a number:", multiplier)
228             multiplier = 1
229         try:
230             int(divisor)
231         except TypeError:
232             self.error("malformed dimension: divisor is not a number:", divisor)
233             divisor = 1
234         if name is None:
235             name = id
236         if algorithm not in ("absolute", "incremental", "percentage-of-absolute-row", "percentage-of-incremental-row"):
237             algorithm = "absolute"
238
239         self._dimensions.append(str(id))
240         if hidden:
241             p = self._format(id, name, algorithm, multiplier, divisor, "hidden")
242             #self._line("DIMENSION", id, name, algorithm, str(multiplier), str(divisor), "hidden")
243         else:
244             p = self._format(id, name, algorithm, multiplier, divisor)
245             #self._line("DIMENSION", id, name, algorithm, str(multiplier), str(divisor))
246
247         self._line("DIMENSION", *p)
248
249     def begin(self, type_id, microseconds=0):
250         """
251         Begin data set
252         :param type_id: str
253         :param microseconds: int
254         :return: boolean
255         """
256         if type_id not in self._charts:
257             self.error("wrong chart type_id:", type_id)
258             return False
259         try:
260             int(microseconds)
261         except TypeError:
262             self.error("malformed begin statement: microseconds are not a number:", microseconds)
263             microseconds = ""
264
265         self._line("BEGIN", type_id, str(microseconds))
266         return True
267
268     def set(self, id, value):
269         """
270         Set value to dimension
271         :param id: str
272         :param value: int/float
273         :return: boolean
274         """
275         if id not in self._dimensions:
276             self.error("wrong dimension id:", id, "Available dimensions are:", *self._dimensions)
277             return False
278         try:
279             value = str(int(value))
280         except TypeError:
281             self.error("cannot set non-numeric value:", value)
282             return False
283         self._line("SET", id, "=", str(value))
284         self.__chart_set = True
285         return True
286
287     def end(self):
288         if self.__chart_set:
289             self._line("END")
290             self.__chart_set = False
291         else:
292             pos = self._data_stream.rfind("BEGIN")
293             self._data_stream = self._data_stream[:pos]
294
295     def commit(self):
296         """
297         Upload new data to netdata
298         """
299         print(self._data_stream)
300         self._data_stream = ""
301
302     def error(self, *params):
303         """
304         Show error message on stderr
305         """
306         msg.error(self.chart_name, *params)
307
308     def debug(self, *params):
309         """
310         Show debug message on stderr
311         """
312         msg.debug(self.chart_name, *params)
313
314     def info(self, *params):
315         """
316         Show information message on stderr
317         """
318         msg.info(self.chart_name, *params)
319
320     def check(self):
321         """
322         check() prototype
323         :return: boolean
324         """
325         msg.error("Service " + str(self.__module__) + "doesn't implement check() function")
326         return False
327
328     def create(self):
329         """
330         create() prototype
331         :return: boolean
332         """
333         msg.error("Service " + str(self.__module__) + "doesn't implement create() function?")
334         return False
335
336     def update(self, interval):
337         """
338         update() prototype
339         :param interval: int
340         :return: boolean
341         """
342         msg.error("Service " + str(self.__module__) + "doesn't implement update() function")
343         return False
344
345
346 class SimpleService(BaseService):
347     def __init__(self, configuration=None, name=None):
348         self.order = []
349         self.definitions = {}
350         BaseService.__init__(self, configuration=configuration, name=name)
351
352     def _get_data(self):
353         """
354         Get some data
355         :return: dict
356         """
357         return {}
358
359     def check(self):
360         """
361         :return:
362         """
363         return True
364
365     def create(self):
366         """
367         Create charts
368         :return: boolean
369         """
370         data = self._get_data()
371         if data is None:
372             return False
373
374         idx = 0
375         for name in self.order:
376             options = self.definitions[name]['options'] + [self.priority + idx, self.update_every]
377             self.chart(self.chart_name + "." + name, *options)
378             # check if server has this datapoint
379             for line in self.definitions[name]['lines']:
380                 if line[0] in data:
381                     self.dimension(*line)
382             idx += 1
383
384         self.commit()
385         return True
386
387     def update(self, interval):
388         """
389         Update charts
390         :param interval: int
391         :return: boolean
392         """
393         data = self._get_data()
394         if data is None:
395             self.debug("_get_data() returned no data")
396             return False
397
398         updated = False
399         for chart in self.order:
400             if self.begin(self.chart_name + "." + chart, interval):
401                 updated = True
402                 for dim in self.definitions[chart]['lines']:
403                     try:
404                         self.set(dim[0], data[dim[0]])
405                     except KeyError:
406                         pass
407                 self.end()
408
409         self.commit()
410         if not updated:
411             self.error("no charts to update")
412
413         return updated
414
415
416 class UrlService(SimpleService):
417     def __init__(self, configuration=None, name=None):
418         self.url = ""
419         self.user = None
420         self.password = None
421         SimpleService.__init__(self, configuration=configuration, name=name)
422
423     def __add_auth(self):
424         passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
425         passman.add_password(None, self.url, self.user, self.password)
426         authhandler = urllib2.HTTPBasicAuthHandler(passman)
427         opener = urllib2.build_opener(authhandler)
428         urllib2.install_opener(opener)
429
430     def _get_raw_data(self):
431         """
432         Get raw data from http request
433         :return: str
434         """
435         raw = None
436         try:
437             f = urllib2.urlopen(self.url, timeout=self.update_every)
438         except Exception as e:
439             self.error(str(e))
440             return None
441
442         try:
443             raw = f.read().decode('utf-8')
444         except Exception as e:
445             self.error(str(e))
446         finally:
447             f.close()
448         return raw
449
450     def check(self):
451         """
452         Format configuration data and try to connect to server
453         :return: boolean
454         """
455         if self.name is None or self.name == str(None):
456             self.name = 'local'
457             self.chart_name += "_" + self.name
458         else:
459             self.name = str(self.name)
460         try:
461             self.url = str(self.configuration['url'])
462         except (KeyError, TypeError):
463             pass
464         try:
465             self.user = str(self.configuration['user'])
466         except (KeyError, TypeError):
467             pass
468         try:
469             self.password = str(self.configuration['pass'])
470         except (KeyError, TypeError):
471             pass
472
473         if self.user is not None and self.password is not None:
474             self.__add_auth()
475
476         if self._get_data() is not None:
477             return True
478         else:
479             return False
480
481
482 class SocketService(SimpleService):
483     def __init__(self, configuration=None, name=None):
484         self._sock = None
485         self._keep_alive = False
486         self.host = "localhost"
487         self.port = None
488         self.unix_socket = None
489         self.request = ""
490         self.__socket_config = None
491         SimpleService.__init__(self, configuration=configuration, name=name)
492
493     def _connect(self):
494         """
495         Recreate socket and connect to it since sockets cannot be reused after closing
496         Available configurations are IPv6, IPv4 or UNIX socket
497         :return:
498         """
499         try:
500             if self.unix_socket is None:
501                 if self.__socket_config is None:
502                     # establish ipv6 or ipv4 connection.
503                     for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
504                         try:
505                             af, socktype, proto, canonname, sa = res
506                             self._sock = socket.socket(af, socktype, proto)
507                         except socket.error as msg:
508                             self._sock = None
509                             continue
510                         try:
511                             self._sock.connect(sa)
512                         except socket.error as msg:
513                             self._disconnect()
514                             continue
515                         self.__socket_config = res
516                         break
517                 else:
518                     # connect to socket with previously established configuration
519                     try:
520                         af, socktype, proto, canonname, sa = self.__socket_config
521                         self._sock = socket.socket(af, socktype, proto)
522                         self._sock.connect(sa)
523                     except socket.error as msg:
524                         self._disconnect()
525             else:
526                 # connect to unix socket
527                 self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
528                 self._sock.connect(self.unix_socket)
529         except Exception as e:
530             self.error(str(e),
531                        "Cannot create socket with following configuration: host:", str(self.host),
532                        "port:", str(self.port),
533                        "socket:", str(self.unix_socket))
534             self._sock = None
535         self._sock.setblocking(0)
536
537     def _disconnect(self):
538         """
539         Close socket connection
540         :return:
541         """
542         try:
543             self._sock.shutdown(2)  # 0 - read, 1 - write, 2 - all
544             self._sock.close()
545         except Exception:
546             pass
547         self._sock = None
548
549     def _send(self):
550         """
551         Send request.
552         :return: boolean
553         """
554         # Send request if it is needed
555         if self.request != "".encode():
556             try:
557                 self._sock.send(self.request)
558             except Exception as e:
559                 self._disconnect()
560                 self.error(str(e),
561                            "used configuration: host:", str(self.host),
562                            "port:", str(self.port),
563                            "socket:", str(self.unix_socket))
564                 return False
565         return True
566
567     def _receive(self):
568         """
569         Receive data from socket
570         :return: str
571         """
572         data = ""
573         while True:
574             try:
575                 ready_to_read, _, in_error = select.select([self._sock], [], [], 15)
576             except Exception as e:
577                 self.debug("SELECT", str(e))
578                 self._disconnect()
579                 break
580             if len(ready_to_read) > 0:
581                 buf = self._sock.recv(4096)
582                 if len(buf) == 0 or buf is None:  # handle server disconnect
583                     break
584                 data += buf.decode()
585                 if self._check_raw_data(data):
586                     break
587             else:
588                 self.error("Socket timed out.")
589                 self._disconnect()
590                 break
591
592         return data
593
594     def _get_raw_data(self):
595         """
596         Get raw data with low-level "socket" module.
597         :return: str
598         """
599         if self._sock is None:
600             self._connect()
601
602         # Send request if it is needed
603         if not self._send():
604             return None
605
606         data = self._receive()
607
608         if not self._keep_alive:
609             self._disconnect()
610
611         return data
612
613     def _check_raw_data(self, data):
614         """
615         Check if all data has been gathered from socket
616         :param data: str
617         :return: boolean
618         """
619         return True
620
621     def _parse_config(self):
622         """
623         Parse configuration data
624         :return: boolean
625         """
626         if self.name is None or self.name == str(None):
627             self.name = ""
628         else:
629             self.name = str(self.name)
630         try:
631             self.unix_socket = str(self.configuration['socket'])
632         except (KeyError, TypeError):
633             self.debug("No unix socket specified. Trying TCP/IP socket.")
634             try:
635                 self.host = str(self.configuration['host'])
636             except (KeyError, TypeError):
637                 self.debug("No host specified. Using: '" + self.host + "'")
638             try:
639                 self.port = int(self.configuration['port'])
640             except (KeyError, TypeError):
641                 self.debug("No port specified. Using: '" + str(self.port) + "'")
642         try:
643             self.request = str(self.configuration['request'])
644         except (KeyError, TypeError):
645             self.debug("No request specified. Using: '" + str(self.request) + "'")
646         self.request = self.request.encode()
647
648
649 class LogService(SimpleService):
650     def __init__(self, configuration=None, name=None):
651         self.log_path = ""
652         self._last_position = 0
653         # self._log_reader = None
654         SimpleService.__init__(self, configuration=configuration, name=name)
655         self.retries = 100000  # basically always retry
656
657     def _get_raw_data(self):
658         """
659         Get log lines since last poll
660         :return: list
661         """
662         lines = []
663         try:
664             if os.path.getsize(self.log_path) < self._last_position:
665                 self._last_position = 0
666             elif os.path.getsize(self.log_path) == self._last_position:
667                 self.debug("Log file hasn't changed. No new data.")
668                 return None
669             with open(self.log_path, "r") as fp:
670                 fp.seek(self._last_position)
671                 for i, line in enumerate(fp):
672                     lines.append(line)
673                 self._last_position = fp.tell()
674         except Exception as e:
675             self.error(str(e))
676
677         if len(lines) != 0:
678             return lines
679         else:
680             self.error("No data collected.")
681             return None
682
683     def check(self):
684         """
685         Parse basic configuration and check if log file exists
686         :return: boolean
687         """
688         if self.name is not None or self.name != str(None):
689             self.name = ""
690         else:
691             self.name = str(self.name)
692         try:
693             self.log_path = str(self.configuration['path'])
694         except (KeyError, TypeError):
695             self.error("No path to log specified. Using: '" + self.log_path + "'")
696
697         if os.access(self.log_path, os.R_OK):
698             return True
699         else:
700             self.error("Cannot access file: '" + self.log_path + "'")
701             return False
702
703     def create(self):
704         status = SimpleService.create(self)
705         self._last_position = 0
706         return status
707
708
709 class ExecutableService(SimpleService):
710     #command_whitelist = ['exim', 'postqueue']
711     bad_substrings = ('&', '|', ';', '>', '<')
712
713     def __init__(self, configuration=None, name=None):
714         self.command = ""
715         SimpleService.__init__(self, configuration=configuration, name=name)
716
717     def _get_raw_data(self):
718         """
719         Get raw data from executed command
720         :return: str
721         """
722         try:
723             p = Popen(self.command, stdout=PIPE, stderr=PIPE)
724         except Exception as e:
725             self.error("Executing command", self.command, "resulted in error:", str(e))
726             return None
727         data = []
728         for line in p.stdout.readlines():
729             data.append(str(line.decode()))
730
731         if len(data) == 0:
732             self.error("No data collected.")
733             return None
734
735         return data
736
737     def check(self):
738         """
739         Parse basic configuration, check if command is whitelisted and is returning values
740         :return: boolean
741         """
742         if self.name is not None or self.name != str(None):
743             self.name = ""
744         else:
745             self.name = str(self.name)
746         try:
747             self.command = str(self.configuration['command'])
748         except (KeyError, TypeError):
749             self.error("No command specified. Using: '" + self.command + "'")
750         self.command = self.command.split(' ')
751         #if self.command[0] not in self.command_whitelist:
752         #    self.error("Command is not whitelisted.")
753         #    return False
754
755         for arg in self.command[1:]:
756             if any(st in arg for st in self.bad_substrings):
757                 self.error("Bad command argument:" + " ".join(self.command[1:]))
758                 return False
759         # test command and search for it in /usr/sbin or /sbin when failed
760         base = self.command[0].split('/')[-1]
761         if self._get_raw_data() is None:
762             for prefix in ['/sbin/', '/usr/sbin/']:
763                 self.command[0] = prefix + base
764                 if os.path.isfile(self.command[0]):
765                     break
766                 #if self._get_raw_data() is not None:
767                 #    break
768
769         if self._get_data() is None or len(self._get_data()) == 0:
770             self.error("Command", self.command, "returned no data")
771             return False
772         return True