]> arthur.barton.de Git - netdata.git/blob - python.d/elasticsearch.chart.py
elastic_search plugin: split queues and rejections chart into two
[netdata.git] / python.d / elasticsearch.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: elastic search node stats netdata python.d module
3 # Author: l2isbad
4
5 from base import UrlService
6 from requests import get
7 from socket import gethostbyname
8 try:
9         from queue import Queue
10 except ImportError:
11         from Queue import Queue
12 from threading import Thread
13
14 # default module values (can be overridden per job in `config`)
15 # update_every = 2
16 update_every = 5
17 priority = 60000
18 retries = 60
19
20 # charts order (can be overridden if you want less charts, or different order)
21 ORDER = ['search_perf_total', 'search_perf_time', 'search_latency', 'index_perf_total', 'index_perf_time',
22          'index_latency', 'jvm_mem_heap', 'jvm_gc_count', 'jvm_gc_time', 'host_metrics_file_descriptors',
23          'host_metrics_http', 'host_metrics_transport', 'thread_pool_qr_q', 'thread_pool_qr_r', 'fdata_cache',
24          'fdata_ev_tr', 'cluster_health_status', 'cluster_health_nodes', 'cluster_health_shards',
25          'cluster_stats_nodes', 'cluster_stats_query_cache', 'cluster_stats_docs', 'cluster_stats_store',
26          'cluster_stats_indices_shards']
27
28 CHARTS = {
29     'search_perf_total': {
30         'options': [None, 'Number of queries, fetches', 'queries', 'search performance', 'es.search_query', 'stacked'],
31         'lines': [
32             ['query_total', 'search_total', 'incremental'],
33             ['fetch_total', 'fetch_total', 'incremental'],
34             ['query_current', 'search_current', 'absolute'],
35             ['fetch_current', 'fetch_current', 'absolute']
36         ]},
37     'search_perf_time': {
38         'options': [None, 'Time spent on queries, fetches', 'seconds', 'search performance', 'es.search_time', 'stacked'],
39         'lines': [
40             ['query_time_in_millis', 'query', 'incremental', 1, 1000],
41             ['fetch_time_in_millis', 'fetch', 'incremental', 1, 1000]
42         ]},
43     'search_latency': {
44         'options': [None, 'Query and fetch latency', 'ms', 'search performance', 'es.search_latency', 'stacked'],
45         'lines': [
46             ['query_latency', 'query', 'absolute', 1, 1000],
47             ['fetch_latency', 'fetch', 'absolute', 1, 1000]
48         ]},
49     'index_perf_total': {
50         'options': [None, 'Number of documents indexed, index refreshes, flushes', 'documents/indexes',
51                     'indexing performance', 'es.index_doc', 'stacked'],
52         'lines': [
53             ['indexing_index_total', 'indexed', 'incremental'],
54             ['refresh_total', 'refreshes', 'incremental'],
55             ['flush_total', 'flushes', 'incremental'],
56             ['indexing_index_current', 'indexed_current', 'absolute'],
57         ]},
58     'index_perf_time': {
59         'options': [None, 'Time spent on indexing, refreshing, flushing', 'seconds', 'indexing performance',
60                     'es.search_time', 'stacked'],
61         'lines': [
62             ['indexing_index_time_in_millis', 'indexing', 'incremental', 1, 1000],
63             ['refresh_total_time_in_millis', 'refreshing', 'incremental', 1, 1000],
64             ['flush_total_time_in_millis', 'flushing', 'incremental', 1, 1000]
65         ]},
66     'index_latency': {
67         'options': [None, 'Indexing and flushing latency', 'ms', 'indexing performance',
68                     'es.index_latency', 'stacked'],
69         'lines': [
70             ['indexing_latency', 'indexing', 'absolute', 1, 1000],
71             ['flushing_latency', 'flushing', 'absolute', 1, 1000]
72         ]},
73     'jvm_mem_heap': {
74         'options': [None, 'JVM heap currently in use/committed', 'percent/MB', 'memory usage and gc',
75                     'es.jvm_heap', 'area'],
76         'lines': [
77             ['jvm_heap_percent', 'inuse', 'absolute'],
78             ['jvm_heap_commit', 'commit', 'absolute', -1, 1048576]
79         ]},
80     'jvm_gc_count': {
81         'options': [None, 'Count of garbage collections', 'counts', 'memory usage and gc', 'es.gc_count', 'stacked'],
82         'lines': [
83             ['young_collection_count', 'young', 'incremental'],
84             ['old_collection_count', 'old', 'incremental']
85         ]},
86     'jvm_gc_time': {
87         'options': [None, 'Time spent on garbage collections', 'ms', 'memory usage and gc', 'es.gc_time', 'stacked'],
88         'lines': [
89             ['young_collection_time_in_millis', 'young', 'incremental'],
90             ['old_collection_time_in_millis', 'old', 'incremental']
91         ]},
92     'thread_pool_qr_q': {
93         'options': [None, 'Number of queued threads in thread pool', 'queued threads', 'queues and rejections',
94                     'es.qr_queued', 'stacked'],
95         'lines': [
96             ['bulk_queue', 'bulk', 'absolute'],
97             ['index_queue', 'index', 'absolute'],
98             ['search_queue', 'search', 'absolute'],
99             ['merge_queue', 'merge', 'absolute']
100         ]},
101     'thread_pool_qr_r': {
102         'options': [None, 'Number of rejected threads in thread pool', 'rejected threads', 'queues and rejections',
103                     'es.qr_rejected', 'stacked'],
104         'lines': [
105             ['bulk_rejected', 'bulk', 'absolute'],
106             ['index_rejected', 'index', 'absolute'],
107             ['search_rejected', 'search', 'absolute'],
108             ['merge_rejected', 'merge', 'absolute']
109         ]},
110     'fdata_cache': {
111         'options': [None, 'Fielddata cache size', 'MB', 'fielddata cache', 'es.fdata_cache', 'line'],
112         'lines': [
113             ['index_fdata_mem', 'mem_size', 'absolute', 1, 1048576]
114         ]},
115     'fdata_ev_tr': {
116         'options': [None, 'Fielddata evictions and circuit breaker tripped count', 'number of events',
117                     'fielddata cache', 'es.fdata_ev_tr', 'line'],
118         'lines': [
119             ['index_fdata_evic', 'evictions', 'incremental'],
120             ['breakers_fdata_trip', 'tripped', 'incremental']
121         ]},
122     'cluster_health_nodes': {
123         'options': [None, 'Nodes and tasks statistics', 'units', 'cluster health API',
124                     'es.cluster_health', 'stacked'],
125         'lines': [
126             ['health_number_of_nodes', 'nodes', 'absolute'],
127             ['health_number_of_data_nodes', 'data_nodes', 'absolute'],
128             ['health_number_of_pending_tasks', 'pending_tasks', 'absolute'],
129             ['health_number_of_in_flight_fetch', 'inflight_fetch', 'absolute']
130         ]},
131     'cluster_health_status': {
132         'options': [None, 'Cluster status', 'status', 'cluster health API',
133                     'es.cluster_health_status', 'area'],
134         'lines': [
135             ['status_green', 'green', 'absolute'],
136             ['status_red', 'red', 'absolute'],
137             ['status_foo1', None, 'absolute'],
138             ['status_foo2', None, 'absolute'],
139             ['status_foo3', None, 'absolute'],
140             ['status_yellow', 'yellow', 'absolute']
141         ]},
142     'cluster_health_shards': {
143         'options': [None, 'Shards statistics', 'shards', 'cluster health API',
144                     'es.cluster_health_sharts', 'stacked'],
145         'lines': [
146             ['health_active_shards', 'active_shards', 'absolute'],
147             ['health_relocating_shards', 'relocating_shards', 'absolute'],
148             ['health_unassigned_shards', 'unassigned', 'absolute'],
149             ['health_delayed_unassigned_shards', 'delayed_unassigned', 'absolute'],
150             ['health_initializing_shards', 'initializing', 'absolute'],
151             ['health_active_shards_percent_as_number', 'active_percent', 'absolute']
152         ]},
153     'cluster_stats_nodes': {
154         'options': [None, 'Nodes statistics', 'nodes', 'cluster stats API',
155                     'es.cluster_stats_nodes', 'stacked'],
156         'lines': [
157             ['count_data_only', 'data_only', 'absolute'],
158             ['count_master_data', 'master_data', 'absolute'],
159             ['count_total', 'total', 'absolute'],
160             ['count_master_only', 'master_only', 'absolute'],
161             ['count_client', 'client', 'absolute']
162         ]},
163     'cluster_stats_query_cache': {
164         'options': [None, 'Query cache statistics', 'queries', 'cluster stats API',
165                     'es.cluster_stats_query_cache', 'stacked'],
166         'lines': [
167             ['query_cache_hit_count', 'hit', 'incremental'],
168             ['query_cache_miss_count', 'miss', 'incremental']
169         ]},
170     'cluster_stats_docs': {
171         'options': [None, 'Docs statistics', 'count', 'cluster stats API',
172                     'es.cluster_stats_docs', 'line'],
173         'lines': [
174             ['docs_count', 'docs', 'absolute']
175         ]},
176     'cluster_stats_store': {
177         'options': [None, 'Store statistics', 'MB', 'cluster stats API',
178                     'es.cluster_stats_store', 'line'],
179         'lines': [
180             ['store_size_in_bytes', 'size', 'absolute', 1, 1048567]
181         ]},
182     'cluster_stats_indices_shards': {
183         'options': [None, 'Indices and shards statistics', 'count', 'cluster stats API',
184                     'es.cluster_stats_ind_sha', 'stacked'],
185         'lines': [
186             ['indices_count', 'indices', 'absolute'],
187             ['shards_total', 'shards', 'absolute']
188         ]},
189     'host_metrics_transport': {
190         'options': [None, 'Cluster communication transport metrics', 'kbit/s', 'host metrics',
191                     'es.host_metrics_transport', 'area'],
192         'lines': [
193             ['transport_rx_size_in_bytes', 'in', 'incremental', 8, 1000],
194             ['transport_tx_size_in_bytes', 'out', 'incremental', -8, 1000]
195         ]},
196     'host_metrics_file_descriptors': {
197         'options': [None, 'Available file descriptors in percent', 'percent', 'host metrics',
198                     'es.host_metrics_descriptors', 'area'],
199         'lines': [
200             ['file_descriptors_used', 'used', 'absolute', 1, 10]
201         ]},
202     'host_metrics_http': {
203         'options': [None, 'Opened HTTP connections', 'connections', 'host metrics',
204                     'es.host_metrics_http', 'line'],
205         'lines': [
206             ['http_current_open', 'opened', 'absolute', 1, 1]
207         ]}
208 }
209
210
211 class Service(UrlService):
212     def __init__(self, configuration=None, name=None):
213         UrlService.__init__(self, configuration=configuration, name=name)
214         self.order = ORDER
215         self.definitions = CHARTS
216         self.host = self.configuration.get('host')
217         self.port = self.configuration.get('port')
218         self.user = self.configuration.get('user')
219         self.password = self.configuration.get('pass')
220         self.latency = dict()
221
222     def check(self):
223         # We can't start if <host> AND <port> not specified
224         if not all([self.host, self.port]):
225             return False
226
227         # It as a bad idea to use hostname.
228         # Hostname -> ipaddress
229         try:
230             self.host = gethostbyname(self.host)
231         except Exception as e:
232             self.error(str(e))
233             return False
234
235         # HTTP Auth? NOT TESTED
236         self.auth = self.user and self.password
237
238         # Create URL for every Elasticsearch API
239         url_node_stats = 'http://%s:%s/_nodes/_local/stats' % (self.host, self.port)
240         url_cluster_health = 'http://%s:%s/_cluster/health' % (self.host, self.port)
241         url_cluster_stats = 'http://%s:%s/_cluster/stats' % (self.host, self.port)
242
243         # Create list of enabled API calls
244         user_choice = [bool(self.configuration.get('node_stats', True)),
245                        bool(self.configuration.get('cluster_health', True)),
246                        bool(self.configuration.get('cluster_stats', True))]
247         
248         avail_methods = [(self._get_node_stats, url_node_stats), 
249                         (self._get_cluster_health, url_cluster_health),
250                         (self._get_cluster_stats, url_cluster_stats)]
251
252         # Remove disabled API calls from 'avail methods'
253         self.methods = [avail_methods[_] for _ in range(len(avail_methods)) if user_choice[_]]
254
255         # Run _get_data for ALL active API calls. 
256         api_result = {}
257         for method in self.methods:
258             api_result[method[1]] = (bool(self._get_raw_data(method[1])))
259
260         # We can start ONLY if all active API calls returned NOT None
261         if not all(api_result.values()):
262             self.error('Plugin could not get data from all APIs')
263             self.error('%s' % api_result)
264             return False
265         else:
266             self.info('%s' % api_result)
267             self.info('Plugin was started successfully')
268
269             return True
270
271     def _get_raw_data(self, url):
272         try:
273             if not self.auth:
274                 raw_data = get(url)
275             else:
276                 raw_data = get(url, auth=(self.user, self.password))
277         except Exception:
278             return None
279
280         return raw_data
281
282     def _get_data(self):
283         threads = list()
284         queue = Queue()
285         result = dict()
286
287         for method in self.methods:
288             th = Thread(target=method[0], args=(queue, method[1]))
289             th.start()
290             threads.append(th)
291
292         for thread in threads:
293             thread.join()
294             result.update(queue.get())
295
296         return result or None
297
298     def _get_cluster_health(self, queue, url):
299         """
300         Format data received from http request
301         :return: dict
302         """
303
304         data = self._get_raw_data(url)
305
306         if not data:
307             queue.put({})
308         else:
309             data = data.json() if '__call__' in dir(data.json) else data.json
310
311             to_netdata = dict()
312             to_netdata.update(update_key('health', data))
313             to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0,
314                                'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0})
315             to_netdata[''.join(['status_', to_netdata.get('health_status', '')])] = 1
316
317             queue.put(to_netdata)
318
319     def _get_cluster_stats(self, queue, url):
320         """
321         Format data received from http request
322         :return: dict
323         """
324
325         data = self._get_raw_data(url)
326
327         if not data:
328             queue.put({})
329         else:
330             data = data.json() if '__call__' in dir(data.json) else data.json
331
332             to_netdata = dict()
333             to_netdata.update(update_key('count', data['nodes']['count']))
334             to_netdata.update(update_key('query_cache', data['indices']['query_cache']))
335             to_netdata.update(update_key('docs', data['indices']['docs']))
336             to_netdata.update(update_key('store', data['indices']['store']))
337             to_netdata['indices_count'] = data['indices']['count']
338             to_netdata['shards_total'] = data['indices'].get('shards', {}).get('total')
339
340             queue.put(to_netdata)
341
342     def _get_node_stats(self, queue, url):
343         """
344         Format data received from http request
345         :return: dict
346         """
347
348         data = self._get_raw_data(url)
349
350         if not data:
351             queue.put({})
352         else:
353             data = data.json() if '__call__' in dir(data.json) else data.json
354
355             node = list(data['nodes'].keys())[0]
356             to_netdata = dict()
357             # Search performance metrics
358             to_netdata.update(data['nodes'][node]['indices']['search'])
359             to_netdata['query_latency'] = self.find_avg(to_netdata['query_total'],
360                                                to_netdata['query_time_in_millis'], 'query_latency')
361             to_netdata['fetch_latency'] = self.find_avg(to_netdata['fetch_total'],
362                                                to_netdata['fetch_time_in_millis'], 'fetch_latency')
363
364             # Indexing performance metrics
365             for key in ['indexing', 'refresh', 'flush']:
366                 to_netdata.update(update_key(key, data['nodes'][node]['indices'].get(key, {})))
367             to_netdata['indexing_latency'] = self.find_avg(to_netdata['indexing_index_total'],
368                                                to_netdata['indexing_index_time_in_millis'], 'index_latency')
369             to_netdata['flushing_latency'] = self.find_avg(to_netdata['flush_total'],
370                                                to_netdata['flush_total_time_in_millis'], 'flush_latency')
371             # Memory usage and garbage collection
372             to_netdata.update(update_key('young', data['nodes'][node]['jvm']['gc']['collectors']['young']))
373             to_netdata.update(update_key('old', data['nodes'][node]['jvm']['gc']['collectors']['old']))
374             to_netdata['jvm_heap_percent'] = data['nodes'][node]['jvm']['mem']['heap_used_percent']
375             to_netdata['jvm_heap_commit'] = data['nodes'][node]['jvm']['mem']['heap_committed_in_bytes']
376
377             # Thread pool queues and rejections
378             for key in ['bulk', 'index', 'search', 'merge']:
379                 to_netdata.update(update_key(key, data['nodes'][node]['thread_pool'].get(key, {})))
380
381             # Fielddata cache
382             to_netdata['index_fdata_mem'] = data['nodes'][node]['indices']['fielddata']['memory_size_in_bytes']
383             to_netdata['index_fdata_evic'] = data['nodes'][node]['indices']['fielddata']['evictions']
384             to_netdata['breakers_fdata_trip'] = data['nodes'][node]['breakers']['fielddata']['tripped']
385
386             # Host metrics
387             to_netdata.update(update_key('http', data['nodes'][node]['http']))
388             to_netdata.update(update_key('transport', data['nodes'][node]['transport']))
389             to_netdata['file_descriptors_used'] = round(float(data['nodes'][node]['process']['open_file_descriptors'])
390                                                         / data['nodes'][node]['process']['max_file_descriptors'] * 1000)
391             
392             queue.put(to_netdata)
393
394     def find_avg(self, value1, value2, key):
395         if key not in self.latency:
396             self.latency.update({key: [value1, value2]})
397             return 0
398         else:
399             if not self.latency[key][0] == value1:
400                 latency = round(float(value2 - self.latency[key][1]) / float(value1 - self.latency[key][0]) * 1000)
401                 self.latency.update({key: [value1, value2]})
402                 return latency
403             else:
404                 self.latency.update({key: [value1, value2]})
405                 return 0
406
407
408 def update_key(string, dictionary):
409     return dict([('_'.join([string, elem[0]]), elem[1]) for elem in dictionary.items()])