]> arthur.barton.de Git - netdata.git/commitdiff
elasticsearch_plugin: code optimization
authorIlya <ilyamaschenko@gmail.com>
Mon, 27 Feb 2017 06:14:01 +0000 (15:14 +0900)
committerIlya <ilyamaschenko@gmail.com>
Mon, 27 Feb 2017 06:14:01 +0000 (15:14 +0900)
python.d/elasticsearch.chart.py

index 15d9538b4e5c24e91daf4d8c24ceebac6ae0d03b..de74a395ad1a3a43bab66fd82a00721843830fef 100644 (file)
@@ -10,6 +10,7 @@ try:
 except ImportError:
         from Queue import Queue
 from threading import Thread
+from collections import namedtuple
 
 # default module values (can be overridden per job in `config`)
 # update_every = 2
@@ -17,6 +18,73 @@ update_every = 5
 priority = 60000
 retries = 60
 
+METHODS = namedtuple('METHODS', ['get_data_function', 'url'])
+
+NODE_STATS = [
+    ('indices.search.fetch_current', None, None),
+    ('indices.search.fetch_total', None, None),
+    ('indices.search.query_current', None, None),
+    ('indices.search.query_total', None, None),
+    ('indices.search.query_time_in_millis', None, None),
+    ('indices.search.fetch_time_in_millis', None, None),
+    ('indices.indexing.index_total', 'indexing_index_total', None),
+    ('indices.indexing.index_current', 'indexing_index_current', None),
+    ('indices.indexing.index_time_in_millis', 'indexing_index_time_in_millis', None),
+    ('indices.refresh.total', 'refresh_total', None),
+    ('indices.refresh.total_time_in_millis', 'refresh_total_time_in_millis', None),
+    ('indices.flush.total', 'flush_total', None),
+    ('indices.flush.total_time_in_millis', 'flush_total_time_in_millis', None),
+    ('jvm.gc.collectors.young.collection_count', 'young_collection_count', None),
+    ('jvm.gc.collectors.old.collection_count', 'old_collection_count', None),
+    ('jvm.gc.collectors.young.collection_time_in_millis', 'young_collection_time_in_millis', None),
+    ('jvm.gc.collectors.old.collection_time_in_millis', 'old_collection_time_in_millis', None),
+    ('jvm.mem.heap_used_percent', 'jvm_heap_percent', None),
+    ('jvm.mem.heap_committed_in_bytes', 'jvm_heap_commit', None),
+    ('thread_pool.bulk.queue', 'bulk_queue', None),
+    ('thread_pool.bulk.rejected', 'bulk_rejected', None),
+    ('thread_pool.index.queue', 'index_queue', None),
+    ('thread_pool.index.rejected', 'index_rejected', None),
+    ('thread_pool.search.queue', 'search_queue', None),
+    ('thread_pool.search.rejected', 'search_rejected', None),
+    ('thread_pool.merge.queue', 'merge_queue', None),
+    ('thread_pool.merge.rejected', 'merge_rejected', None),
+    ('indices.fielddata.memory_size_in_bytes', 'index_fdata_memory', None),
+    ('indices.fielddata.evictions', None, None),
+    ('breakers.fielddata.tripped', None, None),
+    ('http.current_open', 'http_current_open', None),
+    ('transport.rx_size_in_bytes', 'transport_rx_size_in_bytes', None),
+    ('transport.tx_size_in_bytes', 'transport_tx_size_in_bytes', None),
+    ('process.max_file_descriptors', None, None),
+    ('process.open_file_descriptors', None, None)
+]
+
+CLUSTER_STATS = [
+    ('nodes.count.data_only', 'count_data_only', None),
+    ('nodes.count.master_data', 'count_master_data', None),
+    ('nodes.count.total', 'count_total', None),
+    ('nodes.count.master_only', 'count_master_only', None),
+    ('nodes.count.client', 'count_client', None),
+    ('indices.docs.count', 'docs_count', None),
+    ('indices.query_cache.hit_count', 'query_cache_hit_count', None),
+    ('indices.query_cache.miss_count', 'query_cache_miss_count', None),
+    ('indices.store.size_in_bytes', 'store_size_in_bytes', None),
+    ('indices.count', 'indices_count', None),
+    ('indices.shards.total', 'shards_total', None)
+]
+
+HEALTH_STATS = [
+    ('number_of_nodes', 'health_number_of_nodes', None),
+    ('number_of_data_nodes', 'health_number_of_data_nodes', None),
+    ('number_of_pending_tasks', 'health_number_of_pending_tasks', None),
+    ('number_of_in_flight_fetch', 'health_number_of_in_flight_fetch', None),
+    ('active_shards', 'health_active_shards', None),
+    ('relocating_shards', 'health_relocating_shards', None),
+    ('unassigned_shards', 'health_unassigned_shards', None),
+    ('delayed_unassigned_shards', 'health_delayed_unassigned_shards', None),
+    ('initializing_shards', 'health_initializing_shards', None),
+    ('active_shards_percent_as_number', 'health_active_shards_percent_as_number', None)
+]
+
 # charts order (can be overridden if you want less charts, or different order)
 ORDER = ['search_perf_total', 'search_perf_current', 'search_perf_time', 'search_latency', 'index_perf_total',
          'index_perf_current', 'index_perf_time', 'index_latency', 'jvm_mem_heap', 'jvm_gc_count',
@@ -27,19 +95,22 @@ ORDER = ['search_perf_total', 'search_perf_current', 'search_perf_time', 'search
 
 CHARTS = {
     'search_perf_total': {
-        'options': [None, 'Total number of queries, fetches', 'number of', 'search performance', 'es.search_query_total', 'stacked'],
+        'options': [None, 'Total number of queries, fetches', 'number of', 'search performance',
+                    'es.search_query_total', 'stacked'],
         'lines': [
             ['query_total', 'queries', 'incremental'],
             ['fetch_total', 'fetches', 'incremental']
         ]},
     'search_perf_current': {
-        'options': [None, 'Number of queries, fetches in progress', 'number of', 'search performance', 'es.search_query_current', 'stacked'],
+        'options': [None, 'Number of queries, fetches in progress', 'number of', 'search performance',
+                    'es.search_query_current', 'stacked'],
         'lines': [
             ['query_current', 'queries', 'absolute'],
             ['fetch_current', 'fetches', 'absolute']
         ]},
     'search_perf_time': {
-        'options': [None, 'Time spent on queries, fetches', 'seconds', 'search performance', 'es.search_time', 'stacked'],
+        'options': [None, 'Time spent on queries, fetches', 'seconds', 'search performance',
+                    'es.search_time', 'stacked'],
         'lines': [
             ['query_time_in_millis', 'query', 'incremental', 1, 1000],
             ['fetch_time_in_millis', 'fetch', 'incremental', 1, 1000]
@@ -100,7 +171,7 @@ CHARTS = {
         ]},
     'thread_pool_qr_q': {
         'options': [None, 'Number of queued threads in thread pool', 'queued threads', 'queues and rejections',
-                    'es.qr_queued', 'stacked'],
+                    'es.thread_pool_queued', 'stacked'],
         'lines': [
             ['bulk_queue', 'bulk', 'absolute'],
             ['index_queue', 'index', 'absolute'],
@@ -109,7 +180,7 @@ CHARTS = {
         ]},
     'thread_pool_qr_r': {
         'options': [None, 'Number of rejected threads in thread pool', 'rejected threads', 'queues and rejections',
-                    'es.qr_rejected', 'stacked'],
+                    'es.thread_pool_rejected', 'stacked'],
         'lines': [
             ['bulk_rejected', 'bulk', 'absolute'],
             ['index_rejected', 'index', 'absolute'],
@@ -119,23 +190,23 @@ CHARTS = {
     'fdata_cache': {
         'options': [None, 'Fielddata cache size', 'MB', 'fielddata cache', 'es.fdata_cache', 'line'],
         'lines': [
-            ['index_fdata_mem', 'mem_size', 'absolute', 1, 1048576]
+            ['index_fdata_memory', 'cache', 'absolute', 1, 1048576]
         ]},
     'fdata_ev_tr': {
         'options': [None, 'Fielddata evictions and circuit breaker tripped count', 'number of events',
-                    'fielddata cache', 'es.fdata_ev_tr', 'line'],
+                    'fielddata cache', 'es.evictions_tripped', 'line'],
         'lines': [
-            ['index_fdata_evic', 'evictions', 'incremental'],
-            ['breakers_fdata_trip', 'tripped', 'incremental']
+            ['evictions', None, 'incremental'],
+            ['tripped', None, 'incremental']
         ]},
     'cluster_health_nodes': {
         'options': [None, 'Nodes and tasks statistics', 'units', 'cluster health API',
-                    'es.cluster_health', 'stacked'],
+                    'es.cluster_health_nodes', 'stacked'],
         'lines': [
             ['health_number_of_nodes', 'nodes', 'absolute'],
             ['health_number_of_data_nodes', 'data_nodes', 'absolute'],
             ['health_number_of_pending_tasks', 'pending_tasks', 'absolute'],
-            ['health_number_of_in_flight_fetch', 'inflight_fetch', 'absolute']
+            ['health_number_of_in_flight_fetch', 'in_flight_fetch', 'absolute']
         ]},
     'cluster_health_status': {
         'options': [None, 'Cluster status', 'status', 'cluster health API',
@@ -150,7 +221,7 @@ CHARTS = {
         ]},
     'cluster_health_shards': {
         'options': [None, 'Shards statistics', 'shards', 'cluster health API',
-                    'es.cluster_health_sharts', 'stacked'],
+                    'es.cluster_health_shards', 'stacked'],
         'lines': [
             ['health_active_shards', 'active_shards', 'absolute'],
             ['health_relocating_shards', 'relocating_shards', 'absolute'],
@@ -161,7 +232,7 @@ CHARTS = {
         ]},
     'cluster_stats_nodes': {
         'options': [None, 'Nodes statistics', 'nodes', 'cluster stats API',
-                    'es.cluster_stats_nodes', 'stacked'],
+                    'es.cluster_nodes', 'stacked'],
         'lines': [
             ['count_data_only', 'data_only', 'absolute'],
             ['count_master_data', 'master_data', 'absolute'],
@@ -171,46 +242,46 @@ CHARTS = {
         ]},
     'cluster_stats_query_cache': {
         'options': [None, 'Query cache statistics', 'queries', 'cluster stats API',
-                    'es.cluster_stats_query_cache', 'stacked'],
+                    'es.cluster_query_cache', 'stacked'],
         'lines': [
             ['query_cache_hit_count', 'hit', 'incremental'],
             ['query_cache_miss_count', 'miss', 'incremental']
         ]},
     'cluster_stats_docs': {
         'options': [None, 'Docs statistics', 'count', 'cluster stats API',
-                    'es.cluster_stats_docs', 'line'],
+                    'es.cluster_docs', 'line'],
         'lines': [
             ['docs_count', 'docs', 'absolute']
         ]},
     'cluster_stats_store': {
         'options': [None, 'Store statistics', 'MB', 'cluster stats API',
-                    'es.cluster_stats_store', 'line'],
+                    'es.cluster_store', 'line'],
         'lines': [
             ['store_size_in_bytes', 'size', 'absolute', 1, 1048567]
         ]},
     'cluster_stats_indices_shards': {
         'options': [None, 'Indices and shards statistics', 'count', 'cluster stats API',
-                    'es.cluster_stats_ind_sha', 'stacked'],
+                    'es.cluster_indices_shards', 'stacked'],
         'lines': [
             ['indices_count', 'indices', 'absolute'],
             ['shards_total', 'shards', 'absolute']
         ]},
     'host_metrics_transport': {
         'options': [None, 'Cluster communication transport metrics', 'kbit/s', 'host metrics',
-                    'es.host_metrics_transport', 'area'],
+                    'es.host_transport', 'area'],
         'lines': [
             ['transport_rx_size_in_bytes', 'in', 'incremental', 8, 1000],
             ['transport_tx_size_in_bytes', 'out', 'incremental', -8, 1000]
         ]},
     'host_metrics_file_descriptors': {
         'options': [None, 'Available file descriptors in percent', 'percent', 'host metrics',
-                    'es.host_metrics_descriptors', 'area'],
+                    'es.host_descriptors', 'area'],
         'lines': [
             ['file_descriptors_used', 'used', 'absolute', 1, 10]
         ]},
     'host_metrics_http': {
         'options': [None, 'Opened HTTP connections', 'connections', 'host metrics',
-                    'es.host_metrics_http', 'line'],
+                    'es.host_http_connections', 'line'],
         'lines': [
             ['http_current_open', 'opened', 'absolute', 1, 1]
         ]}
@@ -227,6 +298,8 @@ class Service(UrlService):
         self.user = self.configuration.get('user')
         self.password = self.configuration.get('pass')
         self.latency = dict()
+        self.methods = list()
+        self.auth = self.user and self.password
 
     def check(self):
         # We can't start if <host> AND <port> not specified
@@ -234,16 +307,13 @@ class Service(UrlService):
             return False
 
         # It as a bad idea to use hostname.
-        # Hostname -> ipaddress
+        # Hostname -> ip address
         try:
             self.host = gethostbyname(self.host)
-        except Exception as e:
-            self.error(str(e))
+        except Exception as error:
+            self.error(str(error))
             return False
 
-        # HTTP Auth? NOT TESTED
-        self.auth = self.user and self.password
-
         # Create URL for every Elasticsearch API
         url_node_stats = 'http://%s:%s/_nodes/_local/stats' % (self.host, self.port)
         url_cluster_health = 'http://%s:%s/_cluster/health' % (self.host, self.port)
@@ -253,26 +323,30 @@ class Service(UrlService):
         user_choice = [bool(self.configuration.get('node_stats', True)),
                        bool(self.configuration.get('cluster_health', True)),
                        bool(self.configuration.get('cluster_stats', True))]
-        
-        avail_methods = [(self._get_node_stats, url_node_stats), 
-                        (self._get_cluster_health, url_cluster_health),
-                        (self._get_cluster_stats, url_cluster_stats)]
+
+        avail_methods = [METHODS(get_data_function=self._get_node_stats_, url=url_node_stats),
+                         METHODS(get_data_function=self._get_cluster_health_, url=url_cluster_health),
+                         METHODS(get_data_function=self._get_cluster_stats_, url=url_cluster_stats)]
 
         # Remove disabled API calls from 'avail methods'
         self.methods = [avail_methods[_] for _ in range(len(avail_methods)) if user_choice[_]]
 
         # Run _get_data for ALL active API calls. 
-        api_result = {}
+        api_check_result = dict()
         for method in self.methods:
-            api_result[method[1]] = (bool(self._get_raw_data(method[1])))
+            try:
+                api_check_result[method.url] = (bool(method.get_data_function(None, method.url)))
+            except KeyError as error:
+                self.error('Failed to parse %s. Error: %s'  % (method.url, str(error)))
+                return False
 
         # We can start ONLY if all active API calls returned NOT None
-        if not all(api_result.values()):
+        if not all(api_check_result.values()):
             self.error('Plugin could not get data from all APIs')
-            self.error('%s' % api_result)
+            self.error('%s' % api_check_result)
             return False
         else:
-            self.info('%s' % api_result)
+            self.info('%s' % api_check_result)
             self.info('Plugin was started successfully')
 
             return True
@@ -294,7 +368,7 @@ class Service(UrlService):
         result = dict()
 
         for method in self.methods:
-            th = Thread(target=method[0], args=(queue, method[1]))
+            th = Thread(target=method.get_data_function, args=(queue, method.url))
             th.start()
             threads.append(th)
 
@@ -304,7 +378,7 @@ class Service(UrlService):
 
         return result or None
 
-    def _get_cluster_health(self, queue, url):
+    def _get_cluster_health_(self, queue, url):
         """
         Format data received from http request
         :return: dict
@@ -313,19 +387,20 @@ class Service(UrlService):
         data = self._get_raw_data(url)
 
         if not data:
-            queue.put({})
+            return queue.put(dict()) if queue else None
         else:
-            data = data.json() if '__call__' in dir(data.json) else data.json
+            data = data.json() if hasattr(data.json, '__call__') else data.json
+
+            to_netdata = fetch_data_(raw_data=data, metrics_list=HEALTH_STATS)
 
-            to_netdata = dict()
-            to_netdata.update(update_key('health', data))
             to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0,
                                'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0})
-            to_netdata[''.join(['status_', to_netdata.get('health_status', '')])] = 1
+            current_status = 'status_' + data['status']
+            to_netdata[current_status] = 1
 
-            queue.put(to_netdata)
+            return queue.put(to_netdata) if queue else to_netdata
 
-    def _get_cluster_stats(self, queue, url):
+    def _get_cluster_stats_(self, queue, url):
         """
         Format data received from http request
         :return: dict
@@ -334,21 +409,15 @@ class Service(UrlService):
         data = self._get_raw_data(url)
 
         if not data:
-            queue.put({})
+            return queue.put(dict()) if queue else None
         else:
-            data = data.json() if '__call__' in dir(data.json) else data.json
+            data = data.json() if hasattr(data.json, '__call__') else data.json
 
-            to_netdata = dict()
-            to_netdata.update(update_key('count', data['nodes']['count']))
-            to_netdata.update(update_key('query_cache', data['indices']['query_cache']))
-            to_netdata.update(update_key('docs', data['indices']['docs']))
-            to_netdata.update(update_key('store', data['indices']['store']))
-            to_netdata['indices_count'] = data['indices']['count']
-            to_netdata['shards_total'] = data['indices'].get('shards', {}).get('total')
+            to_netdata = fetch_data_(raw_data=data, metrics_list=CLUSTER_STATS)
 
-            queue.put(to_netdata)
+            return queue.put(to_netdata) if queue else to_netdata
 
-    def _get_node_stats(self, queue, url):
+    def _get_node_stats_(self, queue, url):
         """
         Format data received from http request
         :return: dict
@@ -357,50 +426,31 @@ class Service(UrlService):
         data = self._get_raw_data(url)
 
         if not data:
-            queue.put({})
+            return queue.put(dict()) if queue else None
         else:
-            data = data.json() if '__call__' in dir(data.json) else data.json
+            data = data.json() if hasattr(data.json, '__call__') else data.json
 
             node = list(data['nodes'].keys())[0]
-            to_netdata = dict()
-            # Search performance metrics
-            to_netdata.update(data['nodes'][node]['indices']['search'])
-            to_netdata['query_latency'] = self.find_avg(to_netdata['query_total'],
-                                               to_netdata['query_time_in_millis'], 'query_latency')
-            to_netdata['fetch_latency'] = self.find_avg(to_netdata['fetch_total'],
-                                               to_netdata['fetch_time_in_millis'], 'fetch_latency')
-
-            # Indexing performance metrics
-            for key in ['indexing', 'refresh', 'flush']:
-                to_netdata.update(update_key(key, data['nodes'][node]['indices'].get(key, {})))
-            to_netdata['indexing_latency'] = self.find_avg(to_netdata['indexing_index_total'],
-                                               to_netdata['indexing_index_time_in_millis'], 'index_latency')
-            to_netdata['flushing_latency'] = self.find_avg(to_netdata['flush_total'],
-                                               to_netdata['flush_total_time_in_millis'], 'flush_latency')
-            # Memory usage and garbage collection
-            to_netdata.update(update_key('young', data['nodes'][node]['jvm']['gc']['collectors']['young']))
-            to_netdata.update(update_key('old', data['nodes'][node]['jvm']['gc']['collectors']['old']))
-            to_netdata['jvm_heap_percent'] = data['nodes'][node]['jvm']['mem']['heap_used_percent']
-            to_netdata['jvm_heap_commit'] = data['nodes'][node]['jvm']['mem']['heap_committed_in_bytes']
-
-            # Thread pool queues and rejections
-            for key in ['bulk', 'index', 'search', 'merge']:
-                to_netdata.update(update_key(key, data['nodes'][node]['thread_pool'].get(key, {})))
-
-            # Fielddata cache
-            to_netdata['index_fdata_mem'] = data['nodes'][node]['indices']['fielddata']['memory_size_in_bytes']
-            to_netdata['index_fdata_evic'] = data['nodes'][node]['indices']['fielddata']['evictions']
-            to_netdata['breakers_fdata_trip'] = data['nodes'][node]['breakers']['fielddata']['tripped']
-
-            # Host metrics
-            to_netdata.update(update_key('http', data['nodes'][node]['http']))
-            to_netdata.update(update_key('transport', data['nodes'][node]['transport']))
-            to_netdata['file_descriptors_used'] = round(float(data['nodes'][node]['process']['open_file_descriptors'])
-                                                        / data['nodes'][node]['process']['max_file_descriptors'] * 1000)
-            
-            queue.put(to_netdata)
-
-    def find_avg(self, value1, value2, key):
+            to_netdata = fetch_data_(raw_data=data['nodes'][node], metrics_list=NODE_STATS)
+
+            # Search performance latency
+            to_netdata['query_latency'] = self.find_avg_(to_netdata['query_total'],
+                                                         to_netdata['query_time_in_millis'], 'query_latency')
+            to_netdata['fetch_latency'] = self.find_avg_(to_netdata['fetch_total'],
+                                                         to_netdata['fetch_time_in_millis'], 'fetch_latency')
+
+            # Indexing performance latency
+            to_netdata['indexing_latency'] = self.find_avg_(to_netdata['indexing_index_total'],
+                                                            to_netdata['indexing_index_time_in_millis'], 'index_latency')
+            to_netdata['flushing_latency'] = self.find_avg_(to_netdata['flush_total'],
+                                                            to_netdata['flush_total_time_in_millis'], 'flush_latency')
+
+            to_netdata['file_descriptors_used'] = round(float(to_netdata['open_file_descriptors'])
+                                                        / to_netdata['max_file_descriptors'] * 1000)
+
+            return queue.put(to_netdata) if queue else to_netdata
+
+    def find_avg_(self, value1, value2, key):
         if key not in self.latency:
             self.latency.update({key: [value1, value2]})
             return 0
@@ -414,5 +464,17 @@ class Service(UrlService):
                 return 0
 
 
-def update_key(string, dictionary):
-    return dict([('_'.join([string, elem[0]]), elem[1]) for elem in dictionary.items()])
+def fetch_data_(raw_data, metrics_list):
+    to_netdata = dict()
+    for metric, new_name, function in metrics_list:
+        value = raw_data
+        for key in metric.split('.'):
+            try:
+                value = value[key]
+            except KeyError:
+                break
+        if not isinstance(value, dict) and key:
+            to_netdata[new_name or key] = value if not function else function(value)
+
+    return to_netdata
+