]> arthur.barton.de Git - netdata.git/blob - python.d/elasticsearch.chart.py
elasticsearch plugin: small fixes
[netdata.git] / python.d / elasticsearch.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: elastic search node stats netdata python.d module
3 # Author: l2isbad
4
5 from base import UrlService
6 from requests import get
7 from socket import gethostbyname
8 try:
9         from queue import Queue
10 except ImportError:
11         from Queue import Queue
12 from threading import Thread
13
14 # default module values (can be overridden per job in `config`)
15 # update_every = 2
16 update_every = 5
17 priority = 60000
18 retries = 60
19
20 # charts order (can be overridden if you want less charts, or different order)
21 ORDER = ['search_perf_total', 'search_perf_time', 'search_latency', 'index_perf_total', 'index_perf_time',
22          'index_latency', 'jvm_mem_heap', 'jvm_gc_count', 'jvm_gc_time', 'thread_pool_qr', 'fdata_cache',
23          'fdata_ev_tr', 'cluster_health_status', 'cluster_health_nodes', 'cluster_health_shards', 'cluster_stats_nodes',
24          'cluster_stats_query_cache', 'cluster_stats_docs', 'cluster_stats_store', 'cluster_stats_indices_shards']
25
26 CHARTS = {
27     'search_perf_total': {
28         'options': [None, 'Number of queries, fetches', 'queries', 'Search performance', 'es.search_query', 'stacked'],
29         'lines': [
30             ['query_total', 'search_total', 'incremental'],
31             ["fetch_total", 'fetch_total', 'incremental'],
32             ["query_current", 'search_current', 'absolute'],
33             ["fetch_current", 'fetch_current', 'absolute']
34         ]},
35     'search_perf_time': {
36         'options': [None, 'Time spent on queries, fetches', 'seconds', 'Search performance', 'es.search_time', 'stacked'],
37         'lines': [
38             ["query_time_in_millis", 'query', 'incremental', 1, 1000],
39             ["fetch_time_in_millis", 'fetch', 'incremental', 1, 1000]
40         ]},
41     'search_latency': {
42         'options': [None, 'Query and fetch latency', 'ms', 'Search performance', 'es.search_latency', 'stacked'],
43         'lines': [
44             ["query_latency", 'query', 'absolute', 1, 1000],
45             ["fetch_latency", 'fetch', 'absolute', 1, 1000]
46         ]},
47     'index_perf_total': {
48         'options': [None, 'Number of documents indexed, index refreshes, flushes', 'documents/indexes',
49                     'Indexing performance', 'es.index_doc', 'stacked'],
50         'lines': [
51             ['indexing_index_total', 'indexed', 'incremental'],
52             ["refresh_total", 'refreshes', 'incremental'],
53             ["flush_total", 'flushes', 'incremental'],
54             ["indexing_index_current", 'indexed_current', 'absolute'],
55         ]},
56     'index_perf_time': {
57         'options': [None, 'Time spent on indexing, refreshing, flushing', 'seconds', 'Indexing performance',
58                     'es.search_time', 'stacked'],
59         'lines': [
60             ['indexing_index_time_in_millis', 'indexing', 'incremental', 1, 1000],
61             ['refresh_total_time_in_millis', 'refreshing', 'incremental', 1, 1000],
62             ['flush_total_time_in_millis', 'flushing', 'incremental', 1, 1000]
63         ]},
64     'index_latency': {
65         'options': [None, 'Indexing and flushing latency', 'ms', 'Indexing performance',
66                     'es.index_latency', 'stacked'],
67         'lines': [
68             ['indexing_latency', 'indexing', 'absolute', 1, 1000],
69             ['flushing_latency', 'flushing', 'absolute', 1, 1000]
70         ]},
71     'jvm_mem_heap': {
72         'options': [None, 'JVM heap currently in use/committed', 'percent/MB', 'Memory usage and gc',
73                     'es.jvm_heap', 'area'],
74         'lines': [
75             ["jvm_heap_percent", 'inuse', 'absolute'],
76             ["jvm_heap_commit", 'commit', 'absolute', -1, 1048576]
77         ]},
78     'jvm_gc_count': {
79         'options': [None, 'Count of garbage collections', 'counts', 'Memory usage and gc', 'es.gc_count', 'stacked'],
80         'lines': [
81             ["young_collection_count", 'young', 'incremental'],
82             ["old_collection_count", 'old', 'incremental']
83         ]},
84     'jvm_gc_time': {
85         'options': [None, 'Time spent on garbage collections', 'ms', 'Memory usage and gc', 'es.gc_time', 'stacked'],
86         'lines': [
87             ["young_collection_time_in_millis", 'young', 'incremental'],
88             ["old_collection_time_in_millis", 'old', 'incremental']
89         ]},
90     'thread_pool_qr': {
91         'options': [None, 'Number of queued/rejected threads in thread pool', 'threads', 'Queues and rejections',
92                     'es.qr', 'stacked'],
93         'lines': [
94             ["bulk_queue", 'bulk_queue', 'absolute'],
95             ["index_queue", 'index_queue', 'absolute'],
96             ["search_queue", 'search_queue', 'absolute'],
97             ["merge_queue", 'merge_queue', 'absolute'],
98             ["bulk_rejected", 'bulk_rej', 'absolute'],
99             ["index_rejected", 'index_rej', 'absolute'],
100             ["search_rejected", 'search_rej', 'absolute'],
101             ["merge_rejected", 'merge_rej', 'absolute']
102         ]},
103     'fdata_cache': {
104         'options': [None, 'Fielddata cache size', 'MB', 'Fielddata cache', 'es.fdata_cache', 'line'],
105         'lines': [
106             ["index_fdata_mem", 'mem_size', 'absolute', 1, 1048576]
107         ]},
108     'fdata_ev_tr': {
109         'options': [None, 'Fielddata evictions and circuit breaker tripped count', 'number of events',
110                     'Fielddata cache', 'es.fdata_ev_tr', 'line'],
111         'lines': [
112             ["index_fdata_evic", 'evictions', 'incremental'],
113             ["breakers_fdata_trip", 'tripped', 'incremental']
114         ]},
115     'cluster_health_nodes': {
116         'options': [None, 'Nodes and tasks statistics', 'units', 'Cluster health API',
117                     'es.cluster_health', 'stacked'],
118         'lines': [
119             ["health_number_of_nodes", 'nodes', 'absolute'],
120             ["health_number_of_data_nodes", 'data_nodes', 'absolute'],
121             ["health_number_of_pending_tasks", 'pending_tasks', 'absolute'],
122             ["health_number_of_in_flight_fetch", 'inflight_fetch', 'absolute']
123         ]},
124     'cluster_health_status': {
125         'options': [None, 'Cluster status', 'status', 'Cluster health API',
126                     'es.cluster_health_status', 'area'],
127         'lines': [
128             ["status_green", 'green', 'absolute'],
129             ["status_red", 'red', 'absolute'],
130             ["status_foo1", None, 'absolute'],
131             ["status_foo2", None, 'absolute'],
132             ["status_foo3", None, 'absolute'],
133             ["status_yellow", 'yellow', 'absolute']
134         ]},
135     'cluster_health_shards': {
136         'options': [None, 'Shards statistics', 'shards', 'Cluster health API',
137                     'es.cluster_health_sharts', 'stacked'],
138         'lines': [
139             ["health_active_shards", 'active_shards', 'absolute'],
140             ["health_relocating_shards", 'relocating_shards', 'absolute'],
141             ["health_unassigned_shards", 'unassigned', 'absolute'],
142             ["health_delayed_unassigned_shards", 'delayed_unassigned', 'absolute'],
143             ["health_initializing_shards", 'initializing', 'absolute'],
144             ["health_active_shards_percent_as_number", 'active_percent', 'absolute']
145         ]},
146     'cluster_stats_nodes': {
147         'options': [None, 'Nodes statistics', 'nodes', 'Cluster stats API',
148                     'es.cluster_stats_nodes', 'stacked'],
149         'lines': [
150             ['count_data_only', 'data_only', 'absolute'],
151             ['count_master_data', 'master_data', 'absolute'],
152             ['count_total', 'total', 'absolute'],
153             ['count_master_only', 'master_only', 'absolute'],
154             ['count_client', 'client', 'absolute']
155         ]},
156     'cluster_stats_query_cache': {
157         'options': [None, 'Query cache statistics', 'queries', 'Cluster stats API',
158                     'es.cluster_stats_query_cache', 'stacked'],
159         'lines': [
160             ['query_cache_hit_count', 'hit', 'incremental'],
161             ['query_cache_miss_count', 'miss', 'incremental']
162         ]},
163     'cluster_stats_docs': {
164         'options': [None, 'Docs statistics', 'count', 'Cluster stats API',
165                     'es.cluster_stats_docs', 'line'],
166         'lines': [
167             ['docs_count', 'docs', 'absolute']
168         ]},
169     'cluster_stats_store': {
170         'options': [None, 'Store statistics', 'MB', 'Cluster stats API',
171                     'es.cluster_stats_store', 'line'],
172         'lines': [
173             ['store_size_in_bytes', 'size', 'absolute', 1, 1048567]
174         ]},
175     'cluster_stats_indices_shards': {
176         'options': [None, 'Indices and shards statistics', 'count', 'Cluster stats API',
177                     'es.cluster_stats_ind_sha', 'stacked'],
178         'lines': [
179             ['indices_count', 'indices', 'absolute'],
180             ['shards_total', 'shards', 'absolute']
181         ]}
182 }
183
184
185 class Service(UrlService):
186     def __init__(self, configuration=None, name=None):
187         UrlService.__init__(self, configuration=configuration, name=name)
188         self.order = ORDER
189         self.definitions = CHARTS
190         self.host = self.configuration.get('host')
191         self.port = self.configuration.get('port')
192         self.user = self.configuration.get('user')
193         self.password = self.configuration.get('pass')
194         self.latency = dict()
195
196     def check(self):
197         # We can't start if <host> AND <port> not specified
198         if not all([self.host, self.port]):
199             return False
200
201         # It as a bad idea to use hostname.
202         # Hostname -> ipaddress
203         try:
204             self.host = gethostbyname(self.host)
205         except Exception as e:
206             self.error(str(e))
207             return False
208
209         # HTTP Auth? NOT TESTED
210         self.auth = self.user and self.password
211
212         # Create URL for every Elasticsearch API
213         url_node_stats = 'http://%s:%s/_nodes/_local/stats' % (self.host, self.port)
214         url_cluster_health = 'http://%s:%s/_cluster/health' % (self.host, self.port)
215         url_cluster_stats = 'http://%s:%s/_cluster/stats' % (self.host, self.port)
216
217         # Create list of enabled API calls
218         user_choice = [bool(self.configuration.get('node_stats', True)),
219                        bool(self.configuration.get('cluster_health', True)),
220                        bool(self.configuration.get('cluster_stats', True))]
221         
222         avail_methods = [(self._get_node_stats, url_node_stats), 
223                         (self._get_cluster_health, url_cluster_health),
224                         (self._get_cluster_stats, url_cluster_stats)]
225
226         # Remove disabled API calls from 'avail methods'
227         self.methods = [avail_methods[_] for _ in range(len(avail_methods)) if user_choice[_]]
228
229         # Run _get_data for ALL active API calls. 
230         api_result = {}
231         for method in self.methods:
232             api_result[method[1]] = (bool(self._get_raw_data(method[1])))
233
234         # We can start ONLY if all active API calls returned NOT None
235         if not all(api_result.values()):
236             self.error('Plugin could not get data from all APIs')
237             self.error('%s' % api_result)
238             return False
239         else:
240             self.info('%s' % api_result)
241             self.info('Plugin was started successfully')
242
243             return True
244
245     def _get_raw_data(self, url):
246         try:
247             if not self.auth:
248                 raw_data = get(url)
249             else:
250                 raw_data = get(url, auth=(self.user, self.password))
251         except Exception:
252             return None
253
254         return raw_data
255
256     def _get_data(self):
257         threads = list()
258         queue = Queue()
259         result = dict()
260
261         for method in self.methods:
262             th = Thread(target=method[0], args=(queue, method[1]))
263             th.start()
264             threads.append(th)
265
266         for thread in threads:
267             thread.join()
268             result.update(queue.get())
269
270         return result or None
271
272     def _get_cluster_health(self, queue, url):
273         """
274         Format data received from http request
275         :return: dict
276         """
277
278         data = self._get_raw_data(url)
279
280         if not data:
281             queue.put({})
282         else:
283             data = data.json()
284
285             to_netdata = dict()
286             to_netdata.update(update_key('health', data))
287             to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0,
288                                'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0})
289             to_netdata[''.join(['status_', to_netdata.get('health_status', '')])] = 1
290
291             queue.put(to_netdata)
292
293     def _get_cluster_stats(self, queue, url):
294         """
295         Format data received from http request
296         :return: dict
297         """
298
299         data = self._get_raw_data(url)
300
301         if not data:
302             queue.put({})
303         else:
304             data = data.json()
305
306             to_netdata = dict()
307             to_netdata.update(update_key('count', data['nodes']['count']))
308             to_netdata.update(update_key('query_cache', data['indices']['query_cache']))
309             to_netdata.update(update_key('docs', data['indices']['docs']))
310             to_netdata.update(update_key('store', data['indices']['store']))
311             to_netdata['indices_count'] = data['indices']['count']
312             to_netdata['shards_total'] = data['indices']['shards']['total']
313
314             queue.put(to_netdata)
315
316     def _get_node_stats(self, queue, url):
317         """
318         Format data received from http request
319         :return: dict
320         """
321
322         data = self._get_raw_data(url)
323
324         if not data:
325             queue.put({})
326         else:
327             data = data.json()
328             node = list(data['nodes'].keys())[0]
329             to_netdata = dict()
330             # Search performance metrics
331             to_netdata.update(data['nodes'][node]['indices']['search'])
332             to_netdata['query_latency'] = self.find_avg(to_netdata['query_total'],
333                                                to_netdata['query_time_in_millis'], 'query_latency')
334             to_netdata['fetch_latency'] = self.find_avg(to_netdata['fetch_total'],
335                                                to_netdata['fetch_time_in_millis'], 'fetch_latency')
336
337             # Indexing performance metrics
338             for key in ['indexing', 'refresh', 'flush']:
339                 to_netdata.update(update_key(key, data['nodes'][node]['indices'].get(key, {})))
340             to_netdata['indexing_latency'] = self.find_avg(to_netdata['indexing_index_total'],
341                                                to_netdata['indexing_index_time_in_millis'], 'index_latency')
342             to_netdata['flushing_latency'] = self.find_avg(to_netdata['flush_total'],
343                                                to_netdata['flush_total_time_in_millis'], 'flush_latency')
344             # Memory usage and garbage collection
345             to_netdata.update(update_key('young', data['nodes'][node]['jvm']['gc']['collectors']['young']))
346             to_netdata.update(update_key('old', data['nodes'][node]['jvm']['gc']['collectors']['old']))
347             to_netdata['jvm_heap_percent'] = data['nodes'][node]['jvm']['mem']['heap_used_percent']
348             to_netdata['jvm_heap_commit'] = data['nodes'][node]['jvm']['mem']['heap_committed_in_bytes']
349
350             # Thread pool queues and rejections
351             for key in ['bulk', 'index', 'search', 'merge']:
352                 to_netdata.update(update_key(key, data['nodes'][node]['thread_pool'].get(key, {})))
353
354             # Fielddata cache
355             to_netdata['index_fdata_mem'] = data['nodes'][node]['indices']['fielddata']['memory_size_in_bytes']
356             to_netdata['index_fdata_evic'] = data['nodes'][node]['indices']['fielddata']['evictions']
357             to_netdata['breakers_fdata_trip'] = data['nodes'][node]['breakers']['fielddata']['tripped']
358
359             queue.put(to_netdata)
360
361     def find_avg(self, value1, value2, key):
362         if key not in self.latency:
363             self.latency.update({key: [value1, value2]})
364             return 0
365         else:
366             if not self.latency[key][0] == value1:
367                 latency = round(float(value2 - self.latency[key][1]) / float(value1 - self.latency[key][0]) * 1000)
368                 self.latency.update({key: [value1, value2]})
369                 return latency
370             else:
371                 self.latency.update({key: [value1, value2]})
372                 return 0
373
374
375 def update_key(string, dictionary):
376     return {'_'.join([string, k]): v for k, v in dictionary.items()}