1 # -*- coding: utf-8 -*-
2 # Description: netdata python modules framework
3 # Author: Pawel Krupa (paulfantom)
6 # ALL CODE NEEDS TO BE COMPATIBLE WITH Python > 2.7 and Python > 3.1
7 # Follow PEP8 as much as it is possible
8 # "check" and "create" CANNOT be blocking.
9 # "update" CAN be blocking
10 # "update" function needs to be fast, so follow:
11 # https://wiki.python.org/moin/PythonSpeed/PerformanceTips
13 # - use local variables wherever it is possible
14 # - avoid dots in expressions that are executed many times
15 # - use "join()" instead of "+"
16 # - use "import" only at the beginning
18 # using ".encode()" in one thread can block other threads as well (only in python2)
27 from subprocess import Popen, PIPE
28 from sys import exc_info
31 from urlparse import urlparse
33 from urllib.parse import urlparse
36 import urllib.request as urllib2
45 import pymysql as MySQLdb
51 PATH = os.getenv('PATH').split(':')
52 except AttributeError:
53 PATH = '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin'.split(':')
56 # class BaseService(threading.Thread):
57 class SimpleService(threading.Thread):
59 Prototype of Service class.
60 Implemented basic functionality to run jobs by `python.d.plugin`
62 def __init__(self, configuration=None, name=None):
64 This needs to be initialized in child classes
65 :param configuration: dict
68 threading.Thread.__init__(self)
69 self._data_stream = ""
73 self.priority = 140000
76 self.override_name = None
80 self.__chart_set = False
81 self.__first_run = True
84 self._data_from_check = dict()
85 if configuration is None:
86 self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
89 self._extract_base_config(configuration)
91 self.create_timetable()
93 # --- BASIC SERVICE CONFIGURATION ---
95 def _extract_base_config(self, config):
97 Get basic parameters to run service
99 config = {'update_every':1,
106 self.override_name = pop('name')
109 self.update_every = int(pop('update_every'))
110 self.priority = int(pop('priority'))
111 self.retries = int(pop('retries'))
112 self.retries_left = self.retries
113 self.configuration = config
115 def create_timetable(self, freq=None):
117 Create service timetable.
120 timetable = {'last': 1466370091.3767564,
126 freq = self.update_every
128 self.timetable = {'last': now,
129 'next': now - (now % freq) + freq,
132 # --- THREAD CONFIGURATION ---
136 Executes self.update(interval) and draws run time chart.
137 Return value presents exit status of update()
140 t_start = float(time.time())
141 chart_name = self.chart_name
143 since_last = int((t_start - self.timetable['last']) * 1000000)
147 if not self.update(since_last):
148 self.error("update function failed.")
151 # draw performance graph
152 run_time = int((time.time() - t_start) * 1000)
153 print("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" %
154 (self.chart_name, str(since_last), str(run_time)))
156 self.debug(chart_name, "updated in", str(run_time), "ms")
157 self.timetable['last'] = t_start
158 self.__first_run = False
163 Runs job in thread. Handles retries.
164 Exits when job failed or timed out.
167 step = float(self.timetable['freq'])
169 self.timetable['last'] = float(time.time() - step)
170 self.debug("starting data collection - update frequency:", str(step), " retries allowed:", str(self.retries))
171 while True: # run forever, unless something is wrong
172 now = float(time.time())
173 next = self.timetable['next'] = now - (now % step) + step + penalty
175 # it is important to do this in a loop
176 # sleep() is interruptable
178 self.debug("sleeping for", str(next - now), "secs to reach frequency of", str(step), "secs, now:", str(now), " next:", str(next), " penalty:", str(penalty))
179 time.sleep(next - now)
180 now = float(time.time())
184 status = self._run_once()
185 except Exception as e:
190 self.retries_left = self.retries
194 self.retries_left -= 1
195 if self.retries_left <= 0:
197 penalty = float(self.retries * step) / 2
204 self.retries_left = self.retries
205 self.alert("failed to collect data for " + str(self.retries) + " times - increasing penalty to " + str(penalty) + " sec and trying again")
208 self.error("failed to collect data - " + str(self.retries_left) + " retries left - penalty: " + str(penalty) + " sec")
215 Escape and convert passed arguments.
216 :param args: anything
220 append = params.append
225 if type(p) is not str:
232 def _line(self, instruction, *params):
234 Converts *params to string and joins them with one space between every one.
235 Result is appended to self._data_stream
236 :param params: str/int/float
238 tmp = list(map((lambda x: "''" if x is None or len(x) == 0 else x), params))
239 self._data_stream += "%s %s\n" % (instruction, str(" ".join(tmp)))
241 def chart(self, type_id, name="", title="", units="", family="",
242 category="", chart_type="line", priority="", update_every=""):
251 :param chart_type: str
252 :param priority: int/str
253 :param update_every: int/str
255 self._charts.append(type_id)
257 p = self._format(type_id, name, title, units, family, category, chart_type, priority, update_every)
258 self._line("CHART", *p)
260 def dimension(self, id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False):
262 Defines a new dimension for the chart
265 :param algorithm: str
266 :param multiplier: int/str
267 :param divisor: int/str
268 :param hidden: boolean
274 self.error("malformed dimension: multiplier is not a number:", multiplier)
279 self.error("malformed dimension: divisor is not a number:", divisor)
283 if algorithm not in ("absolute", "incremental", "percentage-of-absolute-row", "percentage-of-incremental-row"):
284 algorithm = "absolute"
286 self._dimensions.append(str(id))
288 p = self._format(id, name, algorithm, multiplier, divisor, "hidden")
290 p = self._format(id, name, algorithm, multiplier, divisor)
292 self._line("DIMENSION", *p)
294 def begin(self, type_id, microseconds=0):
298 :param microseconds: int
301 if type_id not in self._charts:
302 self.error("wrong chart type_id:", type_id)
307 self.error("malformed begin statement: microseconds are not a number:", microseconds)
310 self._line("BEGIN", type_id, str(microseconds))
313 def set(self, id, value):
315 Set value to dimension
317 :param value: int/float
320 if id not in self._dimensions:
321 self.error("wrong dimension id:", id, "Available dimensions are:", *self._dimensions)
324 value = str(int(value))
326 self.error("cannot set non-numeric value:", str(value))
328 self._line("SET", id, "=", str(value))
329 self.__chart_set = True
335 self.__chart_set = False
337 pos = self._data_stream.rfind("BEGIN")
338 self._data_stream = self._data_stream[:pos]
342 Upload new data to netdata.
345 print(self._data_stream)
346 except Exception as e:
347 msg.fatal('cannot send data to netdata:', str(e))
348 self._data_stream = ""
350 # --- ERROR HANDLING ---
352 def error(self, *params):
354 Show error message on stderr
356 msg.error(self.chart_name, *params)
358 def alert(self, *params):
360 Show error message on stderr
362 msg.alert(self.chart_name, *params)
364 def debug(self, *params):
366 Show debug message on stderr
368 msg.debug(self.chart_name, *params)
370 def info(self, *params):
372 Show information message on stderr
374 msg.info(self.chart_name, *params)
376 # --- MAIN METHODS ---
390 self.debug("Module", str(self.__module__), "doesn't implement check() function. Using default.")
391 data = self._get_data()
394 self.debug("failed to receive data during check().")
398 self.debug("empty data during check().")
401 self.debug("successfully received data during check(): '" + str(data) + "'")
409 data = self._data_from_check or self._get_data()
411 self.debug("failed to receive data during create().")
415 for name in self.order:
416 options = self.definitions[name]['options'] + [self.priority + idx, self.update_every]
417 self.chart(self.chart_name + "." + name, *options)
418 # check if server has this datapoint
419 for line in self.definitions[name]['lines']:
421 self.dimension(*line)
427 def update(self, interval):
433 data = self._get_data()
435 self.debug("failed to receive data during update().")
439 for chart in self.order:
440 if self.begin(self.chart_name + "." + chart, interval):
442 for dim in self.definitions[chart]['lines']:
444 self.set(dim[0], data[dim[0]])
451 self.error("no charts to update")
456 def find_binary(binary):
458 if isinstance(binary, str):
459 binary = os.path.basename(binary)
460 return next(('/'.join([p, binary]) for p in PATH
461 if os.path.isfile('/'.join([p, binary]))
462 and os.access('/'.join([p, binary]), os.X_OK)))
465 except StopIteration:
469 class UrlService(SimpleService):
470 def __init__(self, configuration=None, name=None):
471 SimpleService.__init__(self, configuration=configuration, name=name)
472 self.url = self.configuration.get('url')
473 self.user = self.configuration.get('user')
474 self.password = self.configuration.get('pass')
475 self.ss_cert = self.configuration.get('ss_cert')
477 def __add_openers(self):
478 def self_signed_cert(ss_cert):
481 ctx = ssl.create_default_context()
482 ctx.check_hostname = False
483 ctx.verify_mode = ssl.CERT_NONE
484 return urllib2.build_opener(urllib2.HTTPSHandler(context=ctx))
485 except AttributeError:
490 self.opener = self_signed_cert(self.ss_cert) or urllib2.build_opener()
493 if self.user and self.password:
494 url_parse = urlparse(self.url)
495 top_level_url = '://'.join([url_parse.scheme, url_parse.netloc])
496 passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
497 passman.add_password(None, top_level_url, self.user, self.password)
498 self.opener.add_handler(urllib2.HTTPBasicAuthHandler(passman))
499 self.debug("Enabling HTTP basic auth")
501 def _get_raw_data(self, custom_url=None):
503 Get raw data from http request
509 f = self.opener.open(custom_url or self.url, timeout=self.update_every * 2)
510 raw_data = f.read().decode('utf-8', 'ignore')
511 except Exception as error:
512 self.error('Url: %s. Error: %s' %(custom_url or self.url, str(error)))
515 if f is not None: f.close()
517 return raw_data or None
521 Format configuration data and try to connect to server
524 if not (self.url and isinstance(self.url, str)):
525 self.error('URL is not defined or type is not <str>')
531 data = self._get_data()
532 except Exception as error:
533 self.error('_get_data() failed. Url: %s. Error: %s' % (self.url, error))
536 if isinstance(data, dict) and data:
537 self._data_from_check = data
540 self.error("_get_data() returned no data or type is not <dict>")
544 class SocketService(SimpleService):
545 def __init__(self, configuration=None, name=None):
547 self._keep_alive = False
548 self.host = "localhost"
550 self.unix_socket = None
552 self.__socket_config = None
553 self.__empty_request = "".encode()
554 SimpleService.__init__(self, configuration=configuration, name=name)
556 def _socketerror(self, message=None):
557 if self.unix_socket is not None:
558 self.error("unix socket '" + self.unix_socket + "':", message)
560 if self.__socket_config is not None:
561 af, socktype, proto, canonname, sa = self.__socket_config
562 self.error("socket to '" + str(sa[0]) + "' port " + str(sa[1]) + ":", message)
564 self.error("unknown socket:", message)
566 def _connect2socket(self, res=None):
568 Connect to a socket, passing the result of getaddrinfo()
572 res = self.__socket_config
574 self.error("Cannot create socket to 'None':")
577 af, socktype, proto, canonname, sa = res
579 self.debug("creating socket to '" + str(sa[0]) + "', port " + str(sa[1]))
580 self._sock = socket.socket(af, socktype, proto)
581 except socket.error as e:
582 self.error("Failed to create socket to '" + str(sa[0]) + "', port " + str(sa[1]) + ":", str(e))
584 self.__socket_config = None
588 self.debug("connecting socket to '" + str(sa[0]) + "', port " + str(sa[1]))
589 self._sock.connect(sa)
590 except socket.error as e:
591 self.error("Failed to connect to '" + str(sa[0]) + "', port " + str(sa[1]) + ":", str(e))
593 self.__socket_config = None
596 self.debug("connected to '" + str(sa[0]) + "', port " + str(sa[1]))
597 self.__socket_config = res
600 def _connect2unixsocket(self):
602 Connect to a unix socket, given its filename
605 if self.unix_socket is None:
606 self.error("cannot connect to unix socket 'None'")
610 self.debug("attempting DGRAM unix socket '" + str(self.unix_socket) + "'")
611 self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
612 self._sock.connect(self.unix_socket)
613 self.debug("connected DGRAM unix socket '" + str(self.unix_socket) + "'")
615 except socket.error as e:
616 self.debug("Failed to connect DGRAM unix socket '" + str(self.unix_socket) + "':", str(e))
619 self.debug("attempting STREAM unix socket '" + str(self.unix_socket) + "'")
620 self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
621 self._sock.connect(self.unix_socket)
622 self.debug("connected STREAM unix socket '" + str(self.unix_socket) + "'")
624 except socket.error as e:
625 self.debug("Failed to connect STREAM unix socket '" + str(self.unix_socket) + "':", str(e))
626 self.error("Failed to connect to unix socket '" + str(self.unix_socket) + "':", str(e))
632 Recreate socket and connect to it since sockets cannot be reused after closing
633 Available configurations are IPv6, IPv4 or UNIX socket
637 if self.unix_socket is not None:
638 self._connect2unixsocket()
641 if self.__socket_config is not None:
642 self._connect2socket()
644 for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
645 if self._connect2socket(res): break
647 except Exception as e:
649 self.__socket_config = None
651 if self._sock is not None:
652 self._sock.setblocking(0)
653 self._sock.settimeout(5)
654 self.debug("set socket timeout to: " + str(self._sock.gettimeout()))
656 def _disconnect(self):
658 Close socket connection
661 if self._sock is not None:
663 self.debug("closing socket")
664 self._sock.shutdown(2) # 0 - read, 1 - write, 2 - all
675 # Send request if it is needed
676 if self.request != self.__empty_request:
678 self.debug("sending request:", str(self.request))
679 self._sock.send(self.request)
680 except Exception as e:
681 self._socketerror("error sending request:" + str(e))
688 Receive data from socket
693 self.debug("receiving response")
695 buf = self._sock.recv(4096)
696 except Exception as e:
697 self._socketerror("failed to receive response:" + str(e))
701 if buf is None or len(buf) == 0: # handle server disconnect
703 self._socketerror("unexpectedly disconnected")
705 self.debug("server closed the connection")
709 self.debug("received data:", str(buf))
710 data += buf.decode('utf-8', 'ignore')
711 if self._check_raw_data(data):
714 self.debug("final response:", str(data))
717 def _get_raw_data(self):
719 Get raw data with low-level "socket" module.
722 if self._sock is None:
724 if self._sock is None:
727 # Send request if it is needed
731 data = self._receive()
733 if not self._keep_alive:
738 def _check_raw_data(self, data):
740 Check if all data has been gathered from socket
746 def _parse_config(self):
748 Parse configuration data
751 if self.name is None or self.name == str(None):
754 self.name = str(self.name)
757 self.unix_socket = str(self.configuration['socket'])
758 except (KeyError, TypeError):
759 self.debug("No unix socket specified. Trying TCP/IP socket.")
760 self.unix_socket = None
762 self.host = str(self.configuration['host'])
763 except (KeyError, TypeError):
764 self.debug("No host specified. Using: '" + self.host + "'")
766 self.port = int(self.configuration['port'])
767 except (KeyError, TypeError):
768 self.debug("No port specified. Using: '" + str(self.port) + "'")
771 self.request = str(self.configuration['request'])
772 except (KeyError, TypeError):
773 self.debug("No request specified. Using: '" + str(self.request) + "'")
775 self.request = self.request.encode()
779 return SimpleService.check(self)
782 class LogService(SimpleService):
783 def __init__(self, configuration=None, name=None):
785 self._last_position = 0
786 # self._log_reader = None
787 SimpleService.__init__(self, configuration=configuration, name=name)
788 self.retries = 100000 # basically always retry
790 def _get_raw_data(self):
792 Get log lines since last poll
797 if os.path.getsize(self.log_path) < self._last_position:
798 self._last_position = 0 # read from beginning if file has shrunk
799 elif os.path.getsize(self.log_path) == self._last_position:
800 self.debug("Log file hasn't changed. No new data.")
801 return [] # return empty list if nothing has changed
802 with open(self.log_path, "r") as fp:
803 fp.seek(self._last_position)
804 for i, line in enumerate(fp):
806 self._last_position = fp.tell()
807 except Exception as e:
813 self.error("No data collected.")
818 Parse basic configuration and check if log file exists
821 if self.name is not None or self.name != str(None):
824 self.name = str(self.name)
826 self.log_path = str(self.configuration['path'])
827 except (KeyError, TypeError):
828 self.info("No path to log specified. Using: '" + self.log_path + "'")
830 if os.access(self.log_path, os.R_OK):
833 self.error("Cannot access file: '" + self.log_path + "'")
837 # set cursor at last byte of log file
838 self._last_position = os.path.getsize(self.log_path)
839 status = SimpleService.create(self)
840 # self._last_position = 0
844 class ExecutableService(SimpleService):
846 def __init__(self, configuration=None, name=None):
847 SimpleService.__init__(self, configuration=configuration, name=name)
850 def _get_raw_data(self):
852 Get raw data from executed command
856 p = Popen(self.command, stdout=PIPE, stderr=PIPE)
857 except Exception as error:
858 self.error("Executing command", self.command, "resulted in error:", str(error))
861 for line in p.stdout.readlines():
862 data.append(line.decode())
868 Parse basic configuration, check if command is whitelisted and is returning values
871 # Preference: 1. "command" from configuration file 2. "command" from plugin (if specified)
872 if 'command' in self.configuration:
873 self.command = self.configuration['command']
875 # "command" must be: 1.not None 2. type <str>
876 if not (self.command and isinstance(self.command, str)):
877 self.error('Command is not defined or command type is not <str>')
880 # Split "command" into: 1. command <str> 2. options <list>
881 command, opts = self.command.split()[0], self.command.split()[1:]
883 # Check for "bad" symbols in options. No pipes, redirects etc. TODO: what is missing?
884 bad_opts = set(''.join(opts)) & set(['&', '|', ';', '>', '<'])
886 self.error("Bad command argument(s): %s" % bad_opts)
889 # Find absolute path ('echo' => '/bin/echo')
890 if '/' not in command:
891 command = self.find_binary(command)
893 self.error('Can\'t locate "%s" binary in PATH(%s)' % (self.command, PATH))
895 # Check if binary exist and executable
897 if not (os.path.isfile(command) and os.access(command, os.X_OK)):
898 self.error('"%s" is not a file or not executable' % command)
901 self.command = [command] + opts if opts else [command]
904 data = self._get_data()
905 except Exception as error:
906 self.error('_get_data() failed. Command: %s. Error: %s' % (self.command, error))
909 if isinstance(data, dict) and data:
910 # We need this for create() method. No reason to execute get_data() again if result is not empty dict()
911 self._data_from_check = data
914 self.error("Command", str(self.command), "returned no data")
918 class MySQLService(SimpleService):
920 def __init__(self, configuration=None, name=None):
921 SimpleService.__init__(self, configuration=configuration, name=name)
922 self.__connection = None
923 self.__conn_properties = dict()
924 self.extra_conn_properties = dict()
925 self.__queries = self.configuration.get('queries', dict())
926 self.queries = dict()
930 connection = MySQLdb.connect(connect_timeout=self.update_every, **self.__conn_properties)
931 except (MySQLdb.MySQLError, TypeError, AttributeError) as error:
932 return None, str(error)
934 return connection, None
937 def get_connection_properties(conf, extra_conf):
939 if 'user' in conf and conf['user']:
940 properties['user'] = conf['user']
941 if 'pass' in conf and conf['pass']:
942 properties['passwd'] = conf['pass']
943 if 'socket' in conf and conf['socket']:
944 properties['unix_socket'] = conf['socket']
945 elif 'host' in conf and conf['host']:
946 properties['host'] = conf['host']
947 properties['port'] = int(conf.get('port', 3306))
948 elif 'my.cnf' in conf and conf['my.cnf']:
949 properties['read_default_file'] = conf['my.cnf']
950 if isinstance(extra_conf, dict) and extra_conf:
951 properties.update(extra_conf)
953 return properties or None
955 def is_valid_queries_dict(raw_queries, log_error):
957 :param raw_queries: dict:
958 :param log_error: function:
959 :return: dict or None
961 raw_queries is valid when: type <dict> and not empty after is_valid_query(for all queries)
963 def is_valid_query(query):
964 return all([isinstance(query, str),
965 query.startswith(('SELECT', 'select', 'SHOW', 'show'))])
967 if hasattr(raw_queries, 'keys') and raw_queries:
968 valid_queries = dict([(n, q) for n, q in raw_queries.items() if is_valid_query(q)])
969 bad_queries = set(raw_queries) - set(valid_queries)
972 log_error('Removed query(s): %s' % bad_queries)
975 log_error('Unsupported "queries" format. Must be not empty <dict>')
979 self.error('MySQLdb or PyMySQL module is needed to use mysql.chart.py plugin')
982 # Preference: 1. "queries" from the configuration file 2. "queries" from the module
983 self.queries = self.__queries or self.queries
984 # Check if "self.queries" exist, not empty and all queries are in valid format
985 self.queries = is_valid_queries_dict(self.queries, self.error)
989 # Get connection properties
990 self.__conn_properties = get_connection_properties(self.configuration, self.extra_conn_properties)
991 if not self.__conn_properties:
992 self.error('Connection properties are missing')
995 # Create connection to the database
996 self.__connection, error = self.__connect()
998 self.error('Can\'t establish connection to MySQL: %s' % error)
1002 data = self._get_data()
1003 except Exception as error:
1004 self.error('_get_data() failed. Error: %s' % error)
1007 if isinstance(data, dict) and data:
1008 # We need this for create() method
1009 self._data_from_check = data
1012 self.error("_get_data() returned no data or type is not <dict>")
1015 def _get_raw_data(self, description=None):
1017 Get raw data from MySQL server
1018 :return: dict: fetchall() or (fetchall(), description)
1021 if not self.__connection:
1022 self.__connection, error = self.__connect()
1027 queries = dict(self.queries)
1029 with self.__connection as cursor:
1030 for name, query in queries.items():
1032 cursor.execute(query)
1033 except (MySQLdb.ProgrammingError, MySQLdb.OperationalError) as error:
1034 if self.__is_error_critical(err_class=exc_info()[0], err_text=str(error)):
1036 self.error('Removed query: %s[%s]. Error: %s'
1037 % (name, query, error))
1038 self.queries.pop(name)
1041 raw_data[name] = (cursor.fetchall(), cursor.description) if description else cursor.fetchall()
1042 self.__connection.commit()
1043 except (MySQLdb.MySQLError, RuntimeError, TypeError, AttributeError):
1044 self.__connection.close()
1045 self.__connection = None
1048 return raw_data or None
1051 def __is_error_critical(err_class, err_text):
1052 return err_class == MySQLdb.OperationalError and all(['denied' not in err_text,
1053 'Unknown column' not in err_text])