]> arthur.barton.de Git - netdata.git/blob - python.d/elasticsearch.chart.py
Merge pull request #1825 from l2isbad/varnish_plugin_p26
[netdata.git] / python.d / elasticsearch.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: elastic search node stats netdata python.d module
3 # Author: l2isbad
4
5 from base import UrlService
6 from requests import get
7 from socket import gethostbyname
8 try:
9         from queue import Queue
10 except ImportError:
11         from Queue import Queue
12 from threading import Thread
13
14 # default module values (can be overridden per job in `config`)
15 # update_every = 2
16 update_every = 5
17 priority = 60000
18 retries = 60
19
20 # charts order (can be overridden if you want less charts, or different order)
21 ORDER = ['search_perf_total', 'search_perf_current', 'search_perf_time', 'search_latency', 'index_perf_total',
22          'index_perf_current', 'index_perf_time', 'index_latency', 'jvm_mem_heap', 'jvm_gc_count',
23          'jvm_gc_time', 'host_metrics_file_descriptors', 'host_metrics_http', 'host_metrics_transport',
24          'thread_pool_qr_q', 'thread_pool_qr_r', 'fdata_cache', 'fdata_ev_tr', 'cluster_health_status',
25          'cluster_health_nodes', 'cluster_health_shards', 'cluster_stats_nodes', 'cluster_stats_query_cache',
26          'cluster_stats_docs', 'cluster_stats_store', 'cluster_stats_indices_shards']
27
28 CHARTS = {
29     'search_perf_total': {
30         'options': [None, 'Total number of queries, fetches', 'number of', 'search performance', 'es.search_query_total', 'stacked'],
31         'lines': [
32             ['query_total', 'queries', 'incremental'],
33             ['fetch_total', 'fetches', 'incremental']
34         ]},
35     'search_perf_current': {
36         'options': [None, 'Number of queries, fetches in progress', 'number of', 'search performance', 'es.search_query_current', 'stacked'],
37         'lines': [
38             ['query_current', 'queries', 'absolute'],
39             ['fetch_current', 'fetches', 'absolute']
40         ]},
41     'search_perf_time': {
42         'options': [None, 'Time spent on queries, fetches', 'seconds', 'search performance', 'es.search_time', 'stacked'],
43         'lines': [
44             ['query_time_in_millis', 'query', 'incremental', 1, 1000],
45             ['fetch_time_in_millis', 'fetch', 'incremental', 1, 1000]
46         ]},
47     'search_latency': {
48         'options': [None, 'Query and fetch latency', 'ms', 'search performance', 'es.search_latency', 'stacked'],
49         'lines': [
50             ['query_latency', 'query', 'absolute', 1, 1000],
51             ['fetch_latency', 'fetch', 'absolute', 1, 1000]
52         ]},
53     'index_perf_total': {
54         'options': [None, 'Total number of documents indexed, index refreshes, index flushes to disk', 'number of',
55                     'indexing performance', 'es.index_performance_total', 'stacked'],
56         'lines': [
57             ['indexing_index_total', 'indexed', 'incremental'],
58             ['refresh_total', 'refreshes', 'incremental'],
59             ['flush_total', 'flushes', 'incremental']
60         ]},
61     'index_perf_current': {
62         'options': [None, 'Number of documents currently being indexed', 'currently indexed',
63                     'indexing performance', 'es.index_performance_current', 'stacked'],
64         'lines': [
65             ['indexing_index_current', 'documents', 'absolute']
66         ]},
67     'index_perf_time': {
68         'options': [None, 'Time spent on indexing, refreshing, flushing', 'seconds', 'indexing performance',
69                     'es.search_time', 'stacked'],
70         'lines': [
71             ['indexing_index_time_in_millis', 'indexing', 'incremental', 1, 1000],
72             ['refresh_total_time_in_millis', 'refreshing', 'incremental', 1, 1000],
73             ['flush_total_time_in_millis', 'flushing', 'incremental', 1, 1000]
74         ]},
75     'index_latency': {
76         'options': [None, 'Indexing and flushing latency', 'ms', 'indexing performance',
77                     'es.index_latency', 'stacked'],
78         'lines': [
79             ['indexing_latency', 'indexing', 'absolute', 1, 1000],
80             ['flushing_latency', 'flushing', 'absolute', 1, 1000]
81         ]},
82     'jvm_mem_heap': {
83         'options': [None, 'JVM heap currently in use/committed', 'percent/MB', 'memory usage and gc',
84                     'es.jvm_heap', 'area'],
85         'lines': [
86             ['jvm_heap_percent', 'inuse', 'absolute'],
87             ['jvm_heap_commit', 'commit', 'absolute', -1, 1048576]
88         ]},
89     'jvm_gc_count': {
90         'options': [None, 'Count of garbage collections', 'counts', 'memory usage and gc', 'es.gc_count', 'stacked'],
91         'lines': [
92             ['young_collection_count', 'young', 'incremental'],
93             ['old_collection_count', 'old', 'incremental']
94         ]},
95     'jvm_gc_time': {
96         'options': [None, 'Time spent on garbage collections', 'ms', 'memory usage and gc', 'es.gc_time', 'stacked'],
97         'lines': [
98             ['young_collection_time_in_millis', 'young', 'incremental'],
99             ['old_collection_time_in_millis', 'old', 'incremental']
100         ]},
101     'thread_pool_qr_q': {
102         'options': [None, 'Number of queued threads in thread pool', 'queued threads', 'queues and rejections',
103                     'es.qr_queued', 'stacked'],
104         'lines': [
105             ['bulk_queue', 'bulk', 'absolute'],
106             ['index_queue', 'index', 'absolute'],
107             ['search_queue', 'search', 'absolute'],
108             ['merge_queue', 'merge', 'absolute']
109         ]},
110     'thread_pool_qr_r': {
111         'options': [None, 'Number of rejected threads in thread pool', 'rejected threads', 'queues and rejections',
112                     'es.qr_rejected', 'stacked'],
113         'lines': [
114             ['bulk_rejected', 'bulk', 'absolute'],
115             ['index_rejected', 'index', 'absolute'],
116             ['search_rejected', 'search', 'absolute'],
117             ['merge_rejected', 'merge', 'absolute']
118         ]},
119     'fdata_cache': {
120         'options': [None, 'Fielddata cache size', 'MB', 'fielddata cache', 'es.fdata_cache', 'line'],
121         'lines': [
122             ['index_fdata_mem', 'mem_size', 'absolute', 1, 1048576]
123         ]},
124     'fdata_ev_tr': {
125         'options': [None, 'Fielddata evictions and circuit breaker tripped count', 'number of events',
126                     'fielddata cache', 'es.fdata_ev_tr', 'line'],
127         'lines': [
128             ['index_fdata_evic', 'evictions', 'incremental'],
129             ['breakers_fdata_trip', 'tripped', 'incremental']
130         ]},
131     'cluster_health_nodes': {
132         'options': [None, 'Nodes and tasks statistics', 'units', 'cluster health API',
133                     'es.cluster_health', 'stacked'],
134         'lines': [
135             ['health_number_of_nodes', 'nodes', 'absolute'],
136             ['health_number_of_data_nodes', 'data_nodes', 'absolute'],
137             ['health_number_of_pending_tasks', 'pending_tasks', 'absolute'],
138             ['health_number_of_in_flight_fetch', 'inflight_fetch', 'absolute']
139         ]},
140     'cluster_health_status': {
141         'options': [None, 'Cluster status', 'status', 'cluster health API',
142                     'es.cluster_health_status', 'area'],
143         'lines': [
144             ['status_green', 'green', 'absolute'],
145             ['status_red', 'red', 'absolute'],
146             ['status_foo1', None, 'absolute'],
147             ['status_foo2', None, 'absolute'],
148             ['status_foo3', None, 'absolute'],
149             ['status_yellow', 'yellow', 'absolute']
150         ]},
151     'cluster_health_shards': {
152         'options': [None, 'Shards statistics', 'shards', 'cluster health API',
153                     'es.cluster_health_sharts', 'stacked'],
154         'lines': [
155             ['health_active_shards', 'active_shards', 'absolute'],
156             ['health_relocating_shards', 'relocating_shards', 'absolute'],
157             ['health_unassigned_shards', 'unassigned', 'absolute'],
158             ['health_delayed_unassigned_shards', 'delayed_unassigned', 'absolute'],
159             ['health_initializing_shards', 'initializing', 'absolute'],
160             ['health_active_shards_percent_as_number', 'active_percent', 'absolute']
161         ]},
162     'cluster_stats_nodes': {
163         'options': [None, 'Nodes statistics', 'nodes', 'cluster stats API',
164                     'es.cluster_stats_nodes', 'stacked'],
165         'lines': [
166             ['count_data_only', 'data_only', 'absolute'],
167             ['count_master_data', 'master_data', 'absolute'],
168             ['count_total', 'total', 'absolute'],
169             ['count_master_only', 'master_only', 'absolute'],
170             ['count_client', 'client', 'absolute']
171         ]},
172     'cluster_stats_query_cache': {
173         'options': [None, 'Query cache statistics', 'queries', 'cluster stats API',
174                     'es.cluster_stats_query_cache', 'stacked'],
175         'lines': [
176             ['query_cache_hit_count', 'hit', 'incremental'],
177             ['query_cache_miss_count', 'miss', 'incremental']
178         ]},
179     'cluster_stats_docs': {
180         'options': [None, 'Docs statistics', 'count', 'cluster stats API',
181                     'es.cluster_stats_docs', 'line'],
182         'lines': [
183             ['docs_count', 'docs', 'absolute']
184         ]},
185     'cluster_stats_store': {
186         'options': [None, 'Store statistics', 'MB', 'cluster stats API',
187                     'es.cluster_stats_store', 'line'],
188         'lines': [
189             ['store_size_in_bytes', 'size', 'absolute', 1, 1048567]
190         ]},
191     'cluster_stats_indices_shards': {
192         'options': [None, 'Indices and shards statistics', 'count', 'cluster stats API',
193                     'es.cluster_stats_ind_sha', 'stacked'],
194         'lines': [
195             ['indices_count', 'indices', 'absolute'],
196             ['shards_total', 'shards', 'absolute']
197         ]},
198     'host_metrics_transport': {
199         'options': [None, 'Cluster communication transport metrics', 'kbit/s', 'host metrics',
200                     'es.host_metrics_transport', 'area'],
201         'lines': [
202             ['transport_rx_size_in_bytes', 'in', 'incremental', 8, 1000],
203             ['transport_tx_size_in_bytes', 'out', 'incremental', -8, 1000]
204         ]},
205     'host_metrics_file_descriptors': {
206         'options': [None, 'Available file descriptors in percent', 'percent', 'host metrics',
207                     'es.host_metrics_descriptors', 'area'],
208         'lines': [
209             ['file_descriptors_used', 'used', 'absolute', 1, 10]
210         ]},
211     'host_metrics_http': {
212         'options': [None, 'Opened HTTP connections', 'connections', 'host metrics',
213                     'es.host_metrics_http', 'line'],
214         'lines': [
215             ['http_current_open', 'opened', 'absolute', 1, 1]
216         ]}
217 }
218
219
220 class Service(UrlService):
221     def __init__(self, configuration=None, name=None):
222         UrlService.__init__(self, configuration=configuration, name=name)
223         self.order = ORDER
224         self.definitions = CHARTS
225         self.host = self.configuration.get('host')
226         self.port = self.configuration.get('port')
227         self.user = self.configuration.get('user')
228         self.password = self.configuration.get('pass')
229         self.latency = dict()
230
231     def check(self):
232         # We can't start if <host> AND <port> not specified
233         if not all([self.host, self.port]):
234             return False
235
236         # It as a bad idea to use hostname.
237         # Hostname -> ipaddress
238         try:
239             self.host = gethostbyname(self.host)
240         except Exception as e:
241             self.error(str(e))
242             return False
243
244         # HTTP Auth? NOT TESTED
245         self.auth = self.user and self.password
246
247         # Create URL for every Elasticsearch API
248         url_node_stats = 'http://%s:%s/_nodes/_local/stats' % (self.host, self.port)
249         url_cluster_health = 'http://%s:%s/_cluster/health' % (self.host, self.port)
250         url_cluster_stats = 'http://%s:%s/_cluster/stats' % (self.host, self.port)
251
252         # Create list of enabled API calls
253         user_choice = [bool(self.configuration.get('node_stats', True)),
254                        bool(self.configuration.get('cluster_health', True)),
255                        bool(self.configuration.get('cluster_stats', True))]
256         
257         avail_methods = [(self._get_node_stats, url_node_stats), 
258                         (self._get_cluster_health, url_cluster_health),
259                         (self._get_cluster_stats, url_cluster_stats)]
260
261         # Remove disabled API calls from 'avail methods'
262         self.methods = [avail_methods[_] for _ in range(len(avail_methods)) if user_choice[_]]
263
264         # Run _get_data for ALL active API calls. 
265         api_result = {}
266         for method in self.methods:
267             api_result[method[1]] = (bool(self._get_raw_data(method[1])))
268
269         # We can start ONLY if all active API calls returned NOT None
270         if not all(api_result.values()):
271             self.error('Plugin could not get data from all APIs')
272             self.error('%s' % api_result)
273             return False
274         else:
275             self.info('%s' % api_result)
276             self.info('Plugin was started successfully')
277
278             return True
279
280     def _get_raw_data(self, url):
281         try:
282             if not self.auth:
283                 raw_data = get(url)
284             else:
285                 raw_data = get(url, auth=(self.user, self.password))
286         except Exception:
287             return None
288
289         return raw_data
290
291     def _get_data(self):
292         threads = list()
293         queue = Queue()
294         result = dict()
295
296         for method in self.methods:
297             th = Thread(target=method[0], args=(queue, method[1]))
298             th.start()
299             threads.append(th)
300
301         for thread in threads:
302             thread.join()
303             result.update(queue.get())
304
305         return result or None
306
307     def _get_cluster_health(self, queue, url):
308         """
309         Format data received from http request
310         :return: dict
311         """
312
313         data = self._get_raw_data(url)
314
315         if not data:
316             queue.put({})
317         else:
318             data = data.json() if '__call__' in dir(data.json) else data.json
319
320             to_netdata = dict()
321             to_netdata.update(update_key('health', data))
322             to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0,
323                                'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0})
324             to_netdata[''.join(['status_', to_netdata.get('health_status', '')])] = 1
325
326             queue.put(to_netdata)
327
328     def _get_cluster_stats(self, queue, url):
329         """
330         Format data received from http request
331         :return: dict
332         """
333
334         data = self._get_raw_data(url)
335
336         if not data:
337             queue.put({})
338         else:
339             data = data.json() if '__call__' in dir(data.json) else data.json
340
341             to_netdata = dict()
342             to_netdata.update(update_key('count', data['nodes']['count']))
343             to_netdata.update(update_key('query_cache', data['indices']['query_cache']))
344             to_netdata.update(update_key('docs', data['indices']['docs']))
345             to_netdata.update(update_key('store', data['indices']['store']))
346             to_netdata['indices_count'] = data['indices']['count']
347             to_netdata['shards_total'] = data['indices'].get('shards', {}).get('total')
348
349             queue.put(to_netdata)
350
351     def _get_node_stats(self, queue, url):
352         """
353         Format data received from http request
354         :return: dict
355         """
356
357         data = self._get_raw_data(url)
358
359         if not data:
360             queue.put({})
361         else:
362             data = data.json() if '__call__' in dir(data.json) else data.json
363
364             node = list(data['nodes'].keys())[0]
365             to_netdata = dict()
366             # Search performance metrics
367             to_netdata.update(data['nodes'][node]['indices']['search'])
368             to_netdata['query_latency'] = self.find_avg(to_netdata['query_total'],
369                                                to_netdata['query_time_in_millis'], 'query_latency')
370             to_netdata['fetch_latency'] = self.find_avg(to_netdata['fetch_total'],
371                                                to_netdata['fetch_time_in_millis'], 'fetch_latency')
372
373             # Indexing performance metrics
374             for key in ['indexing', 'refresh', 'flush']:
375                 to_netdata.update(update_key(key, data['nodes'][node]['indices'].get(key, {})))
376             to_netdata['indexing_latency'] = self.find_avg(to_netdata['indexing_index_total'],
377                                                to_netdata['indexing_index_time_in_millis'], 'index_latency')
378             to_netdata['flushing_latency'] = self.find_avg(to_netdata['flush_total'],
379                                                to_netdata['flush_total_time_in_millis'], 'flush_latency')
380             # Memory usage and garbage collection
381             to_netdata.update(update_key('young', data['nodes'][node]['jvm']['gc']['collectors']['young']))
382             to_netdata.update(update_key('old', data['nodes'][node]['jvm']['gc']['collectors']['old']))
383             to_netdata['jvm_heap_percent'] = data['nodes'][node]['jvm']['mem']['heap_used_percent']
384             to_netdata['jvm_heap_commit'] = data['nodes'][node]['jvm']['mem']['heap_committed_in_bytes']
385
386             # Thread pool queues and rejections
387             for key in ['bulk', 'index', 'search', 'merge']:
388                 to_netdata.update(update_key(key, data['nodes'][node]['thread_pool'].get(key, {})))
389
390             # Fielddata cache
391             to_netdata['index_fdata_mem'] = data['nodes'][node]['indices']['fielddata']['memory_size_in_bytes']
392             to_netdata['index_fdata_evic'] = data['nodes'][node]['indices']['fielddata']['evictions']
393             to_netdata['breakers_fdata_trip'] = data['nodes'][node]['breakers']['fielddata']['tripped']
394
395             # Host metrics
396             to_netdata.update(update_key('http', data['nodes'][node]['http']))
397             to_netdata.update(update_key('transport', data['nodes'][node]['transport']))
398             to_netdata['file_descriptors_used'] = round(float(data['nodes'][node]['process']['open_file_descriptors'])
399                                                         / data['nodes'][node]['process']['max_file_descriptors'] * 1000)
400             
401             queue.put(to_netdata)
402
403     def find_avg(self, value1, value2, key):
404         if key not in self.latency:
405             self.latency.update({key: [value1, value2]})
406             return 0
407         else:
408             if not self.latency[key][0] == value1:
409                 latency = round(float(value2 - self.latency[key][1]) / float(value1 - self.latency[key][0]) * 1000)
410                 self.latency.update({key: [value1, value2]})
411                 return latency
412             else:
413                 self.latency.update({key: [value1, value2]})
414                 return 0
415
416
417 def update_key(string, dictionary):
418     return dict([('_'.join([string, elem[0]]), elem[1]) for elem in dictionary.items()])