From d383f17795b25f2829bf93b6d30748aa6dd98fd7 Mon Sep 17 00:00:00 2001 From: nepython Date: Mon, 20 Jul 2020 21:51:34 +0530 Subject: [PATCH] [timeseries] Add top fields support --- README.rst | 41 ++++- openwisp_monitoring/db/__init__.py | 3 +- .../db/backends/elasticsearch/__init__.py | 3 + .../db/backends/elasticsearch/client.py | 144 ++++++++++++------ .../db/backends/elasticsearch/index.py | 49 +++--- .../db/backends/elasticsearch/queries.py | 86 ++++++++--- .../db/backends/elasticsearch/settings.py | 3 + .../db/backends/influxdb/client.py | 34 ++++- .../db/backends/influxdb/queries.py | 4 - .../db/backends/influxdb/tests.py | 4 +- openwisp_monitoring/device/base/models.py | 15 +- openwisp_monitoring/monitoring/base/models.py | 21 ++- openwisp_monitoring/monitoring/charts.py | 1 + 13 files changed, 280 insertions(+), 128 deletions(-) create mode 100644 openwisp_monitoring/db/backends/elasticsearch/settings.py diff --git a/README.rst b/README.rst index d0419b880..bce71bd40 100644 --- a/README.rst +++ b/README.rst @@ -24,7 +24,7 @@ Available Features * Collects and displays device status information like uptime, RAM status, CPU load averages, Interface addresses, WiFi interface status and associated clients, Neighbors information, DHCP Leases, Disk/Flash status -* Collection of monitoring information in a timeseries database (currently only influxdb is supported) +* Collection of monitoring information in a timeseries database (`InfluxDB `_ and `Elasticsearch `_ are currently supported) * Monitoring charts for uptime, packet loss, round trip time (latency), associated wifi clients, interface traffic, RAM usage, CPU load, flash/disk usage * Charts can be viewed at resolutions of 1 day, 3 days, a week, a month and a year @@ -46,6 +46,8 @@ beforehand. In case you prefer not to use Docker you can `install InfluxDB `_ and Redis from your repositories, but keep in mind that the version packaged by your distribution may be different. +If you wish to use ``Elasticsearch`` for storing and retrieving timeseries data then `install Elasticsearch `_. + Install spatialite and sqlite: .. code-block:: shell @@ -106,6 +108,19 @@ Follow the setup instructions of `openwisp-controller 'PORT': '8086', } +In case, you wish to use ``Elasticsearch`` for timeseries data storage and retrieval, +make use i=of the following settings + +.. code-block:: python + TIMESERIES_DATABASE = { + 'BACKEND': 'openwisp_monitoring.db.backends.elasticsearch', + 'USER': 'openwisp', + 'PASSWORD': 'openwisp', + 'NAME': 'openwisp2', + 'HOST': 'localhost', + 'PORT': '9200', + } + ``urls.py``: .. code-block:: python @@ -231,6 +246,9 @@ This data is only used to assess the recent status of devices, keeping it for a long time would not add much benefit and would cost a lot more in terms of disk space. +**Note**: In case you use ``Elasticsearch`` then time shall be taken as integral multiple of a day. +That means the time ``36h0m0s`` shall be interpreted as ``24h0m0s`` (integral multiple of a day). + ``OPENWISP_MONITORING_AUTO_PING`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -360,11 +378,21 @@ MB (megabytes) instead of GB (Gigabytes) you can use: "SUM(rx_bytes) / 1000000 AS download FROM {key} " "WHERE time >= '{time}' AND content_type = '{content_type}' " "AND object_id = '{object_id}' GROUP BY time(1d)" - ) + ), + 'elasticsearch': _make_query({ + 'upload': {'sum': {'field': 'points.fields.tx_bytes'}}, + 'download': {'avg': {'field': 'points.fields.rx_bytes'}}, + }) }, } } + # This needs to be declared separately but only for elasticsearch + OPENWISP_MONITORING_ADDITIONAL_CHARTS_OPERATIONS = { + 'upload': {'operator': '/', 'value': 1000000}, + 'download': {'operator': '/', 'value': 1000000}, + } + Or if you want to define a new chart configuration, which you can then call in your custom code (eg: a custom check class), you can do so as follows: @@ -372,6 +400,8 @@ call in your custom code (eg: a custom check class), you can do so as follows: from django.utils.translation import gettext_lazy as _ + from openwisp_monitoring.db.backends.elasticsearch import _make_query + OPENWISP_MONITORING_CHARTS = { 'ram': { 'type': 'line', @@ -385,7 +415,12 @@ call in your custom code (eg: a custom check class), you can do so as follows: "MEAN(buffered) AS buffered FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}' " "GROUP BY time(1d)" - ) + ), + 'elasticsearch': _make_query({ + 'total': {'avg': {'field': 'points.fields.total'}}, + 'free': {'avg': {'field': 'points.fields.free'}}, + 'buffered': {'avg': {'field': 'points.fields.buffered'}}, + }) }, } } diff --git a/openwisp_monitoring/db/__init__.py b/openwisp_monitoring/db/__init__.py index 6d8207fa6..64510ebc5 100644 --- a/openwisp_monitoring/db/__init__.py +++ b/openwisp_monitoring/db/__init__.py @@ -1,6 +1,5 @@ from .backends import timeseries_db chart_query = timeseries_db.queries.chart_query -device_data_query = timeseries_db.queries.device_data_query -__all__ = ['timeseries_db', 'chart_query', 'device_data_query'] +__all__ = ['timeseries_db', 'chart_query'] diff --git a/openwisp_monitoring/db/backends/elasticsearch/__init__.py b/openwisp_monitoring/db/backends/elasticsearch/__init__.py index e69de29bb..5eb061d1b 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/__init__.py +++ b/openwisp_monitoring/db/backends/elasticsearch/__init__.py @@ -0,0 +1,3 @@ +from .queries import _make_query + +__all__ = ['_make_query'] diff --git a/openwisp_monitoring/db/backends/elasticsearch/client.py b/openwisp_monitoring/db/backends/elasticsearch/client.py index 3e07bd4c5..7271c4458 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/client.py +++ b/openwisp_monitoring/db/backends/elasticsearch/client.py @@ -1,5 +1,6 @@ import json import logging +from collections import Counter from copy import deepcopy from datetime import datetime, timedelta @@ -70,7 +71,9 @@ def __init__(self, db_name='metric'): def create_database(self): """ creates connection to elasticsearch """ - connections.create_connection(hosts=[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"]) + connections.create_connection( + hosts=[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"] + ) db = self.get_db # Skip if support for Index Lifecycle Management is disabled or no privileges self.ilm_enabled = db.ilm.start()['acknowledged'] @@ -122,15 +125,16 @@ def create_or_alter_retention_policy(self, name, duration=None): ilm.put_lifecycle(policy=name, body=policy) def query(self, query, precision=None): - index = query.pop('key') - return Search(using=self.get_db, index=index).from_dict(query).execute().to_dict() + if 'summary' in query: + query.pop('summary') + return Search(using=self.get_db).from_dict(query).execute().to_dict() def write(self, name, values, **kwargs): rp = kwargs.get('retention_policy') tags = kwargs.get('tags') timestamp = kwargs.get('timestamp') - metric_id = find_metric(self.get_db, name, tags, rp, add=True) - metric_index = MetricIndex.get(metric_id, index=name, using=self.get_db) + metric_id, index = find_metric(self.get_db, name, tags, rp, add=True) + metric_index = MetricIndex.get(metric_id, index=index, using=self.get_db) point = Point(fields=values, time=timestamp or datetime.now()) metric_index.points.append(point) metric_index.save() @@ -140,10 +144,11 @@ def read(self, key, fields, tags, limit=1, order='-time', **kwargs): time_format = kwargs.get('time_format') # TODO: It will be of the form 'now() - s' # since = kwargs.get('since') - metric_id = find_metric(self.get_db, key, tags) - if not metric_id: + try: + metric_id, index = find_metric(self.get_db, key, tags) + except TypeError: return [] - metric_index = MetricIndex.get(index=key, id=metric_id, using=self.get_db) + metric_index = MetricIndex.get(index=index, id=metric_id, using=self.get_db) if order == 'time': points = list(metric_index.points[0:limit]) elif order == '-time': @@ -195,26 +200,31 @@ def _format_time(self, obj, time_format=None): def get_list_query(self, query, precision='s'): response = self.query(query, precision) - points = response['aggregations']['GroupByTime']['buckets'] - list_points = self._fill_points( - query, [self._format(point) for point in points] - ) + try: + points = response['aggregations']['GroupByTime']['set_range']['time'][ + 'buckets' + ] + list_points = self._fill_points( + query, [self._format(point) for point in points], + ) + except KeyError: + return [] return list_points def _fill_points(self, query, points): - _range = next( - (item for item in query['query']['bool']['must'] if item.get('range')), None - ) + _range = query['aggs']['GroupByTime']['nested']['aggs']['set_range'] if not _range or not points: return points - days = int(_range['range']['points.time']['from'][4:-3]) + days = int(_range['filter']['range']['points.time']['from'][4:-3]) + interval = _range['aggs']['time']['date_histogram']['fixed_interval'] + # Check if summary query + if f'{days}d' == interval: + return points + interval_dict = {'10m': 600, '20m': 1200, '1h': 3600, '24h': 86400} + interval = interval_dict[interval] start_time = datetime.now() end_time = start_time - timedelta(days=days) # include today dummy_point = deepcopy(points[0]) - if len(points) > 2: - interval = points[0]['time'] - points[1]['time'] - else: - interval = 600 start_ts = points[0]['time'] + interval end_ts = points[-1]['time'] - interval for field in dummy_point.keys(): @@ -223,7 +233,7 @@ def _fill_points(self, query, points): dummy_point['time'] = start_ts points.insert(0, deepcopy(dummy_point)) start_ts += interval - # TODO: This needs to be fixed and shouldn't be required since intervals are set + # TODO: Why is this required since intervals are set? while points[-1]['time'] < end_time.timestamp(): points.pop(-1) while end_ts > end_time.timestamp(): @@ -238,8 +248,8 @@ def delete_metric_data(self, key=None, tags=None): deletes all metrics if neither provided """ if key and tags: - metric_id = find_metric(self.get_db, key, tags) - self.get_db.delete(index=key, id=metric_id) + metric_id, index = find_metric(self.get_db, key, tags) + self.get_db.delete(index=index, id=metric_id) elif key: self.get_db.indices.delete(index=key, ignore=[400, 404]) else: @@ -252,14 +262,19 @@ def validate_query(self, query): query = json.loads(query) # Elasticsearch currently supports validation of only query section, # aggs, size, _source etc. are not supported - valid_check = self.get_db.indices.validate_query(body={'query': query['query']}, explain=True) + valid_check = self.get_db.indices.validate_query( + body={'query': query['query']}, explain=True + ) # Show a helpful message for failure if not valid_check['valid']: - raise ValidationError(valid_check['explanations']) + raise ValidationError(valid_check['error']) return self._is_aggregate(query) + # TODO: This is not covering everything def _is_aggregate(self, q): - agg_dict = q['aggs']['GroupByTime']['aggs'].values() + agg_dict = q['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs'][ + 'time' + ]['aggs']['nest']['nested']['aggs'].values() agg = [] for item in agg_dict: agg.append(next(iter(item))) @@ -276,45 +291,78 @@ def get_query( query=None, timezone=settings.TIME_ZONE, ): - query['key'] = params.pop('key') query = json.dumps(query) for k, v in params.items(): query = query.replace('{' + k + '}', v) query = self._group_by(query, time, chart_type, group_map, strip=summary) query = json.loads(query) - if summary: - _range = next( - (item for item in query['query']['bool']['must'] if item.get('range')), - None, + set_range = query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs']['time'] + if fields: + aggregate_dict = set_range['aggs']['nest']['nested']['aggs'] + agg = deepcopy(aggregate_dict).popitem()[1].popitem()[0] + aggregate_dict.update( + { + f'{field}': {agg: {'field': f'points.fields.{field}'}} + for field in fields + } ) - if _range: - query['query']['bool']['must'].remove(_range) - query['aggs']['GroupByTime']['date_histogram']['time_zone'] = timezone + try: + set_range['date_histogram']['time_zone'] = timezone + except KeyError: + pass return query def _group_by(self, query, time, chart_type, group_map, strip=False): - if not self.validate_query(query): - return query + query = query.replace('1d/d', f'{time}/d') if not strip and not chart_type == 'histogram': value = group_map[time] - query = query.replace('1d/d', f'{time}/d') query = query.replace('10m', value) if strip: query = query.replace('10m', time) return query - # TODO: def _get_top_fields( self, - query, params, chart_type, group_map, number, time, + query=None, timezone=settings.TIME_ZONE, + get_fields=True, + **kwargs, ): - pass + """ + Returns top fields if ``get_fields`` set to ``True`` (default) + else it returns points containing the top fields. + """ + response = self.get_db.indices.get_mapping(index=params['key']) + fields = [ + k + for k, v in list(response.values())[0]['mappings']['properties']['points'][ + 'properties' + ]['fields']['properties'].items() + ] + query = self.get_query( + chart_type, + params, + time, + group_map, + summary=True, + fields=fields, + query=query, + timezone=timezone, + ) + point = self.get_list_query(query)[0] + time = point.pop('time') + point = Counter(point).most_common(number) + if get_fields: + return [k for k, v in point] + points = [{'time': time}] + for k, v in point: + points[0].update({k: v}) + return points def _format(self, point): pt = {} @@ -322,7 +370,9 @@ def _format(self, point): pt['time'] = int(point['key'] / 1000) for key, value in point.items(): if isinstance(value, dict): - pt[key] = self._transform_field(key, value['value']) + for k, v in value.items(): + if isinstance(v, dict): + pt[k] = self._transform_field(k, v['value']) return pt def _transform_field(self, field, value): @@ -338,12 +388,10 @@ def _transform_field(self, field, value): def default_chart_query(self, tags): q = deepcopy(default_chart_query) if not tags: - q['query']['bool']['must'].pop(0) - q['query']['bool']['must'].pop(1) + q['query']['nested']['query']['bool']['must'].pop(0) + q['query']['nested']['query']['bool']['must'].pop(1) return q - -# TODO: -# Fix Average - currently it's computing average over all fields! -# Time Interval - fix range -# Device query + def _device_data(self, key, tags, fields, **kwargs): + """ returns last snapshot of ``device_data`` """ + return self.read(key=key, fields=fields, tags=tags, time_format='isoformat',) diff --git a/openwisp_monitoring/db/backends/elasticsearch/index.py b/openwisp_monitoring/db/backends/elasticsearch/index.py index ac212c582..b15f4d392 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/index.py +++ b/openwisp_monitoring/db/backends/elasticsearch/index.py @@ -36,20 +36,22 @@ def find_metric(client, index, tags, retention_policy=None, add=False): for key, value in tags.items(): tags_dict[f'tags.{key}'] = value q = Q( - 'bool', must=[Q('match', **{k: str(v)}) for k, v in tags_dict.items()] + 'nested', + path='tags', + query=Q( + 'bool', must=[Q('match', **{k: str(v)}) for k, v in tags_dict.items()] + ), ) - # if index == 'device_data': - # q = Q('nested', path='tags', query=q) else: q = Q() try: - return list(search.query(q).execute())[0].meta['id'] + result = list(search.query(q).execute())[0].meta + return result['id'], result['index'] except (NotFoundError, AttributeError, IndexError): - return ( - add_doc(client, index, tags, retention_policy=retention_policy)['_id'] - if add - else None - ) + if add: + obj = add_doc(client, index, tags, retention_policy=retention_policy) + return obj['_id'], obj['_index'] + return None def add_doc(client, key, tags, _id=None, retention_policy=None): @@ -60,22 +62,15 @@ def add_doc(client, key, tags, _id=None, retention_policy=None): _id = str(_id or uuid.uuid1()) # Check if index exists if client.indices.exists(index=key): - client.create(index=key, id=_id, body={'tags': tags}) - return {'_id': _id} + index_aliases = client.indices.get_alias(index=key) + for k, v in index_aliases.items(): + if v['aliases'][key]['is_write_index']: + break + client.create(index=k, id=_id, body={'tags': tags}) + return {'_id': _id, '_index': k} # Create a new index if it doesn't exist name = f'{key}-000001' obj = MetricIndex(meta={'id': _id}) - obj.meta.index = name - obj.tags = tags - # TODO: If mappings are put then aggregations don't work, find why. - # Issue similar to https://discuss.elastic.co/t/aggregations-do-not-work-any-more-index-corrupt-resolved/24947/5 - obj.save(using=client, index=name) - client.indices.put_alias(index=name, name=key, body={'is_write_index': True}) - client.indices.put_settings(body={ - 'number_of_replicas': 0, - 'lifecycle.name': retention_policy or 'default', - 'lifecycle.rollover_alias': key, - }, index=name) obj._index = obj._index.clone(name) # Create a new index template if it doesn't exist if not client.indices.exists_template(name=key): @@ -84,6 +79,16 @@ def add_doc(client, key, tags, _id=None, retention_policy=None): obj._index.settings(**{'lifecycle.name': retention_policy}) # index pattern is added for Index Lifecycle Management obj._index.as_template(key, f'{key}-*').save(using=client) + obj.init(using=client, index=name) + obj.meta.index = name + obj.tags = tags + obj.save(using=client, index=name) + client.indices.put_alias(index=name, name=key, body={'is_write_index': True}) + if retention_policy: + client.indices.put_settings( + body={'lifecycle.name': retention_policy}, index=name + ) + client.indices.put_settings(body={'lifecycle.rollover_alias': key}, index=name) return obj.to_dict(include_meta=True) diff --git a/openwisp_monitoring/db/backends/elasticsearch/queries.py b/openwisp_monitoring/db/backends/elasticsearch/queries.py index 002e46c32..9a8d2ff2f 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/queries.py +++ b/openwisp_monitoring/db/backends/elasticsearch/queries.py @@ -1,35 +1,64 @@ import operator from copy import deepcopy +from openwisp_utils.utils import deep_merge_dicts + +from .settings import ADDITIONAL_CHART_OPERATIONS + default_chart_query = { 'query': { - 'bool': { - 'must': [ - {'match': {'tags.object_id': {'query': '{object_id}'}}}, - {'match': {'tags.content_type': {'query': '{content_type}'}}}, - {'range': {'points.time': {'from': 'now-1d/d', 'to': 'now/d'}}}, - ] - } + 'nested': { + 'path': 'tags', + 'query': { + 'bool': { + 'must': [ + {'match': {'tags.object_id': {'query': '{object_id}'}}}, + {'match': {'tags.content_type': {'query': '{content_type}'}}}, + ] + } + }, + }, }, - # 'nested': { - # 'path': 'points', - # 'query': { - # 'bool': { - # 'must': [{'range': {'points.time': {'from': 'now-1d/d', 'to': 'now/d'}}}] - # } - # } - # } '_source': False, 'size': 0, 'aggs': { 'GroupByTime': { - 'date_histogram': { - 'field': 'points.time', - 'fixed_interval': '10m', - 'format': 'date_time_no_millis', - 'order': {'_key': 'desc'}, - }, - 'aggs': {'{field_name}': {'avg': {'field': 'points.fields.{field_name}'}}}, + 'nested': { + 'path': 'points', + 'aggs': { + 'set_range': { + 'filter': { + 'range': { + 'points.time': {'from': 'now-1d/d', 'to': 'now/d'} + } + }, + 'aggs': { + 'time': { + 'date_histogram': { + 'field': 'points.time', + 'fixed_interval': '10m', + 'format': 'date_time_no_millis', + 'order': {'_key': 'desc'}, + }, + 'aggs': { + 'nest': { + 'nested': { + 'path': 'points.fields', + 'aggs': { + '{field_name}': { + 'avg': { + 'field': 'points.fields.{field_name}' + } + } + }, + } + }, + }, + }, + }, + } + }, + } } }, } @@ -50,11 +79,20 @@ '/': operator.truediv, } +if ADDITIONAL_CHART_OPERATIONS: + assert isinstance(ADDITIONAL_CHART_OPERATIONS, dict) + for value in ADDITIONAL_CHART_OPERATIONS.values(): + assert value['operator'] in operator_lookup + assert isinstance(value['value'], (int, float)) + math_map = deep_merge_dicts(math_map, ADDITIONAL_CHART_OPERATIONS) + def _make_query(aggregation=None): query = deepcopy(default_chart_query) if aggregation: - query['aggs']['GroupByTime']['aggs'] = aggregation + query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs']['time'][ + 'aggs' + ]['nest']['nested']['aggs'] = aggregation return query @@ -85,5 +123,3 @@ def _get_chart_query(): chart_query = _get_chart_query() - -device_data_query = None diff --git a/openwisp_monitoring/db/backends/elasticsearch/settings.py b/openwisp_monitoring/db/backends/elasticsearch/settings.py new file mode 100644 index 000000000..3dfc1e343 --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/settings.py @@ -0,0 +1,3 @@ +from django.conf import settings + +ADDITIONAL_CHART_OPERATIONS = getattr(settings, 'OPENWISP_MONITORING_ADDITIONAL_CHART_OPERATIONS', {}) diff --git a/openwisp_monitoring/db/backends/influxdb/client.py b/openwisp_monitoring/db/backends/influxdb/client.py index 694e4cce5..56b0eec6b 100644 --- a/openwisp_monitoring/db/backends/influxdb/client.py +++ b/openwisp_monitoring/db/backends/influxdb/client.py @@ -265,6 +265,7 @@ def __transform_field(self, field, function, operation=None): def _get_top_fields( self, + default_query, query, params, chart_type, @@ -272,9 +273,15 @@ def _get_top_fields( number, time, timezone=settings.TIME_ZONE, + get_fields=True, ): + """ + Returns top fields if ``get_fields`` set to ``True`` (default) + else it returns points containing the top fields. + """ + q = default_query.replace('{field_name}', '{fields}') q = self.get_query( - query=query, + query=q, params=params, chart_type=chart_type, group_map=group_map, @@ -283,7 +290,7 @@ def _get_top_fields( time=time, timezone=timezone, ) - res = list(self.query(q, precision='s').get_points()) + res = self.get_list_query(q) if not res: return [] res = res[0] @@ -293,10 +300,31 @@ def _get_top_fields( keys = list(sorted_dict.keys()) keys.reverse() top = keys[0:number] - return [item.replace('sum_', '') for item in top] + top_fields = [item.replace('sum_', '') for item in top] + if get_fields: + return top_fields + query = self.get_query( + query=query, + params=params, + chart_type=chart_type, + group_map=group_map, + summary=True, + fields=top_fields, + time=time, + timezone=timezone, + ) + return self.get_list_query(query) def default_chart_query(self, tags): q = "SELECT {field_name} FROM {key} WHERE time >= '{time}'" if tags: q += " AND content_type = '{content_type}' AND object_id = '{object_id}'" return q + + def _device_data(self, key, tags, rp, **kwargs): + """ returns last snapshot of ``device_data`` """ + query = ( + f"SELECT data FROM {rp}.{key} WHERE pk = '{tags['pk']}' " + "ORDER BY time DESC LIMIT 1" + ) + return self.get_list_query(query, precision=None) diff --git a/openwisp_monitoring/db/backends/influxdb/queries.py b/openwisp_monitoring/db/backends/influxdb/queries.py index 2fd426fd9..7bc5b42e5 100644 --- a/openwisp_monitoring/db/backends/influxdb/queries.py +++ b/openwisp_monitoring/db/backends/influxdb/queries.py @@ -58,7 +58,3 @@ ) }, } - -device_data_query = ( - "SELECT data FROM {0}.{1} WHERE pk = '{2}' " "ORDER BY time DESC LIMIT 1" -) diff --git a/openwisp_monitoring/db/backends/influxdb/tests.py b/openwisp_monitoring/db/backends/influxdb/tests.py index 90e1eebd0..0bf274b61 100644 --- a/openwisp_monitoring/db/backends/influxdb/tests.py +++ b/openwisp_monitoring/db/backends/influxdb/tests.py @@ -187,10 +187,10 @@ def test_query_set(self): ) self.assertEqual(c.query, expected) self.assertEqual( - ''.join(timeseries_db.queries.default_chart_query[0:2]), c._default_query + ''.join(timeseries_db.default_chart_query(tags=c.metric.tags)), c._default_query ) c.metric.object_id = None - self.assertEqual(timeseries_db.queries.default_chart_query[0], c._default_query) + self.assertEqual(timeseries_db.default_chart_query(tags=None), c._default_query) def test_read_order(self): m = self._create_general_metric(name='dummy') diff --git a/openwisp_monitoring/device/base/models.py b/openwisp_monitoring/device/base/models.py index d15986406..e979543c6 100644 --- a/openwisp_monitoring/device/base/models.py +++ b/openwisp_monitoring/device/base/models.py @@ -20,7 +20,7 @@ from openwisp_utils.base import TimeStampedEditableModel -from ...db import device_data_query, timeseries_db +from ...db import timeseries_db from ...monitoring.signals import threshold_crossed from .. import settings as app_settings from ..schema import schema @@ -97,16 +97,9 @@ def data(self): if self.__data: return self.__data # skipped this due to performance and inverted index issues! - if not timeseries_db.backend_name == 'elasticsearch': - q = device_data_query.format(SHORT_RP, self.__key, self.pk) - points = timeseries_db.get_list_query(q, precision=None) - else: - points = timeseries_db.read( - key=self.__key, - fields='data', - tags={'pk': self.pk}, - time_format='isoformat', - ) + points = timeseries_db._device_data( + rp=SHORT_RP, tags={'pk': self.pk}, key=self.__key, fields='data' + ) if not points: return None self.data_timestamp = points[0]['time'] diff --git a/openwisp_monitoring/monitoring/base/models.py b/openwisp_monitoring/monitoring/base/models.py index 6670c5685..12b31ac7f 100644 --- a/openwisp_monitoring/monitoring/base/models.py +++ b/openwisp_monitoring/monitoring/base/models.py @@ -341,10 +341,10 @@ def get_top_fields(self, number): Returns list of top ``number`` of fields (highest sum) of a measurement in the specified time range (descending order). """ - q = self._default_query.replace('{field_name}', '{fields}') params = self._get_query_params(self.DEFAULT_TIME) return timeseries_db._get_top_fields( - query=q, + default_query=self._default_query, + query=self.get_query(), chart_type=self.type, group_map=self.GROUP_MAP, number=number, @@ -389,16 +389,21 @@ def read( try: query_kwargs = dict(time=time, timezone=timezone) if self.top_fields: - fields = self.get_top_fields(self.top_fields) - data_query = self.get_query(fields=fields, **query_kwargs) - summary_query = self.get_query( - fields=fields, summary=True, **query_kwargs + points = summary = timeseries_db._get_top_fields( + default_query=self._default_query, + chart_type=self.type, + group_map=self.GROUP_MAP, + number=self.top_fields, + params=self._get_query_params(self.DEFAULT_TIME), + time=time, + query=self.query, + get_fields=False, ) else: data_query = self.get_query(**query_kwargs) summary_query = self.get_query(summary=True, **query_kwargs) - points = timeseries_db.get_list_query(data_query) - summary = timeseries_db.get_list_query(summary_query) + points = timeseries_db.get_list_query(data_query) + summary = timeseries_db.get_list_query(summary_query) except timeseries_db.client_error as e: logging.error(e, exc_info=True) raise e diff --git a/openwisp_monitoring/monitoring/charts.py b/openwisp_monitoring/monitoring/charts.py index 6de3ec5e2..249f864f8 100644 --- a/openwisp_monitoring/monitoring/charts.py +++ b/openwisp_monitoring/monitoring/charts.py @@ -1,6 +1,7 @@ from django.core.exceptions import ImproperlyConfigured from django.utils.translation import gettext_lazy as _ from openwisp_monitoring.db import chart_query +from openwisp_monitoring.db.backends.elasticsearch.queries import _make_query from openwisp_utils.utils import deep_merge_dicts