From 044a29f6535483358b6d334355157d77dddb2b5b Mon Sep 17 00:00:00 2001 From: Hardik Jain Date: Tue, 7 Jul 2020 07:31:46 +0530 Subject: [PATCH] [timeseries] Add initial support for elasticsearch #99 Closes #99 --- Dockerfile | 3 +- README.rst | 41 +- docker-compose.yml | 48 ++ openwisp_monitoring/db/__init__.py | 4 +- openwisp_monitoring/db/backends/__init__.py | 3 +- .../db/backends/elasticsearch/__init__.py | 3 + .../db/backends/elasticsearch/client.py | 466 ++++++++++++++++++ .../db/backends/elasticsearch/index.py | 92 ++++ .../db/backends/elasticsearch/queries.py | 130 +++++ .../elasticsearch/retention_policies.py | 41 ++ .../db/backends/elasticsearch/settings.py | 5 + .../elasticsearch/tests/__init__.py} | 0 .../elasticsearch/tests/client_tests.py | 296 +++++++++++ .../db/backends/influxdb/client.py | 43 +- .../db/backends/influxdb/queries.py | 9 - .../db/backends/influxdb/tests/__init__.py | 0 .../{tests.py => tests/client_tests.py} | 110 +++-- openwisp_monitoring/db/tests.py | 55 +++ openwisp_monitoring/device/base/models.py | 7 +- openwisp_monitoring/device/tests/test_api.py | 6 +- openwisp_monitoring/monitoring/base/models.py | 37 +- .../monitoring/tests/__init__.py | 30 +- .../monitoring/tests/test_charts.py | 57 --- requirements.txt | 1 + setup.py | 8 +- tests/openwisp2/settings.py | 16 +- 26 files changed, 1356 insertions(+), 155 deletions(-) create mode 100644 openwisp_monitoring/db/backends/elasticsearch/__init__.py create mode 100644 openwisp_monitoring/db/backends/elasticsearch/client.py create mode 100644 openwisp_monitoring/db/backends/elasticsearch/index.py create mode 100644 openwisp_monitoring/db/backends/elasticsearch/queries.py create mode 100644 openwisp_monitoring/db/backends/elasticsearch/retention_policies.py create mode 100644 openwisp_monitoring/db/backends/elasticsearch/settings.py rename openwisp_monitoring/db/{utils.py => backends/elasticsearch/tests/__init__.py} (100%) create mode 100644 openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py create mode 100644 openwisp_monitoring/db/backends/influxdb/tests/__init__.py rename openwisp_monitoring/db/backends/influxdb/{tests.py => tests/client_tests.py} (80%) create mode 100644 openwisp_monitoring/db/tests.py diff --git a/Dockerfile b/Dockerfile index 44e0b890a..fff279ac6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,6 +18,7 @@ WORKDIR /opt/openwisp/tests/ ENV NAME=openwisp-monitoring \ PYTHONBUFFERED=1 \ INFLUXDB_HOST=influxdb \ - REDIS_HOST=redis + REDIS_HOST=redis \ + ELASTICSEARCH_HOST=es01 CMD ["sh", "docker-entrypoint.sh"] EXPOSE 8000 diff --git a/README.rst b/README.rst index 7a945b055..4f0c07043 100644 --- a/README.rst +++ b/README.rst @@ -74,6 +74,7 @@ Available Features * Collects and displays `device status <#device-status>`_ information like uptime, RAM status, CPU load averages, Interface properties and addresses, WiFi interface status and associated clients, Neighbors information, DHCP Leases, Disk/Flash status +* Collection of monitoring information in a timeseries database (`InfluxDB `_ and `Elasticsearch `_ are currently supported) * Monitoring charts for uptime, packet loss, round trip time (latency), associated wifi clients, interface traffic, RAM usage, CPU load, flash/disk usage * Charts can be viewed at resolutions of 1 day, 3 days, a week, a month and a year @@ -108,6 +109,8 @@ beforehand. In case you prefer not to use Docker you can `install InfluxDB `_ and Redis from your repositories, but keep in mind that the version packaged by your distribution may be different. +If you wish to use ``Elasticsearch`` for storing and retrieving timeseries data then `install Elasticsearch `_. + Install spatialite and sqlite: .. code-block:: shell @@ -165,6 +168,20 @@ Follow the setup instructions of `openwisp-controller 'PORT': '8086', } +In case, you wish to use ``Elasticsearch`` for timeseries data storage and retrieval, +make use of the following settings + +.. code-block:: python + + TIMESERIES_DATABASE = { + 'BACKEND': 'openwisp_monitoring.db.backends.elasticsearch', + 'USER': 'openwisp', + 'PASSWORD': 'openwisp', + 'NAME': 'openwisp2', + 'HOST': 'localhost', + 'PORT': '9200', + } + ``urls.py``: .. code-block:: python @@ -461,6 +478,9 @@ This data is only used to assess the recent status of devices, keeping it for a long time would not add much benefit and would cost a lot more in terms of disk space. +**Note**: In case you use ``Elasticsearch`` then time shall be taken as integral multiple of a day. +That means the time ``36h0m0s`` shall be interpreted as ``24h0m0s`` (integral multiple of a day). + ``OPENWISP_MONITORING_AUTO_PING`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -764,11 +784,21 @@ MB (megabytes) instead of GB (Gigabytes) you can use: "SUM(rx_bytes) / 1000000 AS download FROM {key} " "WHERE time >= '{time}' AND content_type = '{content_type}' " "AND object_id = '{object_id}' GROUP BY time(1d)" - ) + ), + 'elasticsearch': _make_query({ + 'upload': {'sum': {'field': 'points.fields.tx_bytes'}}, + 'download': {'avg': {'field': 'points.fields.rx_bytes'}}, + }) }, } } + # Please declare the operations separately in case you use elasticsearch as done below + OPENWISP_MONITORING_ADDITIONAL_CHART_OPERATIONS = { + 'upload': {'operator': '/', 'value': 1000000}, + 'download': {'operator': '/', 'value': 1000000}, + } + Or if you want to define a new chart configuration, which you can then call in your custom code (eg: a custom check class), you can do so as follows: @@ -776,6 +806,8 @@ call in your custom code (eg: a custom check class), you can do so as follows: from django.utils.translation import gettext_lazy as _ + from openwisp_monitoring.db.backends.elasticsearch import _make_query + OPENWISP_MONITORING_CHARTS = { 'ram': { 'type': 'line', @@ -789,7 +821,12 @@ call in your custom code (eg: a custom check class), you can do so as follows: "MEAN(buffered) AS buffered FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}' " "GROUP BY time(1d)" - ) + ), + 'elasticsearch': _make_query({ + 'total': {'avg': {'field': 'points.fields.total'}}, + 'free': {'avg': {'field': 'points.fields.free'}}, + 'buffered': {'avg': {'field': 'points.fields.buffered'}}, + }) }, } } diff --git a/docker-compose.yml b/docker-compose.yml index 6386355ed..795fed19a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,6 +11,8 @@ services: depends_on: - influxdb - redis + - es01 + - es02 influxdb: image: influxdb:1.8-alpine @@ -22,6 +24,45 @@ services: INFLUXDB_DB: openwisp2 INFLUXDB_USER: openwisp INFLUXDB_USER_PASSWORD: openwisp + # clustered version of elasticsearch is used as that might be used in production + es01: + image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0 + container_name: es01 + environment: + - "node.name=es01" + - "discovery.seed_hosts=es02" + - "cluster.initial_master_nodes=es01,es02" + - "cluster.name=openwisp2" + - "bootstrap.memory_lock=true" + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - esdata01:/usr/share/elasticsearch/data + ports: + - 9200:9200 + networks: + - esnet + es02: + image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0 + container_name: es02 + environment: + - "node.name=es02" + - "discovery.seed_hosts=es01" + - "cluster.initial_master_nodes=es01,es02" + - "cluster.name=openwisp2" + - "bootstrap.memory_lock=true" + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - esdata02:/usr/share/elasticsearch/data + networks: + - esnet redis: image: redis:5.0-alpine @@ -31,3 +72,10 @@ services: volumes: influxdb-data: {} + esdata01: + driver: local + esdata02: + driver: local + +networks: + esnet: diff --git a/openwisp_monitoring/db/__init__.py b/openwisp_monitoring/db/__init__.py index 063d2d8f7..64510ebc5 100644 --- a/openwisp_monitoring/db/__init__.py +++ b/openwisp_monitoring/db/__init__.py @@ -1,7 +1,5 @@ from .backends import timeseries_db chart_query = timeseries_db.queries.chart_query -default_chart_query = timeseries_db.queries.default_chart_query -device_data_query = timeseries_db.queries.device_data_query -__all__ = ['timeseries_db', 'chart_query', 'default_chart_query', 'device_data_query'] +__all__ = ['timeseries_db', 'chart_query'] diff --git a/openwisp_monitoring/db/backends/__init__.py b/openwisp_monitoring/db/backends/__init__.py index 715b1113c..b04e09db6 100644 --- a/openwisp_monitoring/db/backends/__init__.py +++ b/openwisp_monitoring/db/backends/__init__.py @@ -48,7 +48,8 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None): except ImportError as e: # The database backend wasn't found. Display a helpful error message # listing all built-in database backends. - builtin_backends = ['influxdb'] + builtin_backends = ['influxdb', 'elasticsearch'] + raise e if backend_name not in [ f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends ]: diff --git a/openwisp_monitoring/db/backends/elasticsearch/__init__.py b/openwisp_monitoring/db/backends/elasticsearch/__init__.py new file mode 100644 index 000000000..5eb061d1b --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/__init__.py @@ -0,0 +1,3 @@ +from .queries import _make_query + +__all__ = ['_make_query'] diff --git a/openwisp_monitoring/db/backends/elasticsearch/client.py b/openwisp_monitoring/db/backends/elasticsearch/client.py new file mode 100644 index 000000000..574744e79 --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/client.py @@ -0,0 +1,466 @@ +import json +import logging +import re +from collections import Counter +from copy import deepcopy +from datetime import datetime, timedelta + +from django.conf import settings +from django.core.exceptions import ValidationError +from django.utils.functional import cached_property +from elasticsearch import Elasticsearch +from elasticsearch.exceptions import ElasticsearchException, NotFoundError +from elasticsearch_dsl import Search +from elasticsearch_dsl.connections import connections +from pytz import timezone as tz + +from openwisp_utils.utils import deep_merge_dicts + +from ...exceptions import TimeseriesWriteException +from .. import TIMESERIES_DB +from .index import MetricDocument, Point, find_metric +from .queries import default_chart_query, math_map, operator_lookup +from .retention_policies import _make_policy, default_rp_policy + +logger = logging.getLogger(__name__) + + +class DatabaseClient(object): + _AGGREGATE = [ + 'filters', + 'children', + 'parent', + 'date_histogram', + 'auto_date_histogram', + 'date_range', + 'geo_distance', + 'geohash_grid', + 'geotile_grid', + 'global', + 'geo_centroid', + 'global', + 'ip_range', + 'missing', + 'nested', + 'range', + 'reverse_nested', + 'significant_terms', + 'significant_text', + 'sampler', + 'terms', + 'diversified_sampler', + 'composite', + 'top_hits', + 'avg', + 'weighted_avg', + 'cardinality', + 'extended_stats', + 'geo_bounds', + 'max', + 'min', + 'percentiles', + 'percentile_ranks', + 'scripted_metric', + 'stats', + 'sum', + 'value_count', + ] + backend_name = 'elasticsearch' + + def __init__(self, db_name='metric'): + self.db_name = db_name or TIMESERIES_DB['NAME'] + self.client_error = ElasticsearchException + + def create_database(self): + """ creates connection to elasticsearch """ + connections.create_connection( + hosts=[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"] + ) + db = self.get_db + # Skip if support for Index Lifecycle Management is disabled or no privileges + self.ilm_enabled = db.ilm.start()['acknowledged'] + self.create_or_alter_retention_policy(name='default') + + def drop_database(self): + """ deletes all indices """ + logger.debug('Deleted all indices data from Elasticsearch') + + @cached_property + def get_db(self): + """ Returns an ``Elasticsearch Client`` instance """ + return Elasticsearch( + hosts=[{'host': TIMESERIES_DB['HOST'], 'port': TIMESERIES_DB['PORT']}], + http_auth=(TIMESERIES_DB['USER'], TIMESERIES_DB['PASSWORD']), + retry_on_timeout=True, + # sniff before doing anything + sniff_on_start=True, + # refresh nodes after a node fails to respond + sniff_on_connection_fail=True, + ) + + def create_or_alter_retention_policy(self, name, duration=None): + """ + creates or alters existing retention policy if necessary + + Note: default retention policy can't be altered with this function + """ + if not self.ilm_enabled: + return + ilm = self.get_db.ilm + if not duration: + try: + ilm.get_lifecycle(policy='default') + except NotFoundError: + ilm.put_lifecycle(policy='default', body=default_rp_policy) + return + days = f'{int(duration.split("h")[0]) // 24}d' + duration_changed = False + try: + policy = ilm.get_lifecycle(policy=name) + exists = True + current_duration = policy[name]['policy']['phases']['hot']['actions'][ + 'rollover' + ]['max_age'] + duration_changed = current_duration != days + except NotFoundError: + exists = False + if not exists or duration_changed: + policy = _make_policy(days) + ilm.put_lifecycle(policy=name, body=policy) + + def get_list_retention_policies(self): + if not self.ilm_enabled: + return + return self.get_db.ilm.get_lifecycle() + + def query(self, query, key=None, **kwargs): + try: + response = ( + Search(using=self.get_db, index=key) + .update_from_dict(query) + .execute() + .to_dict() + ) + if not response['hits']['total']['value']: + return {} + return response + except NotFoundError: + return {} + + def write(self, name, values, **kwargs): + rp = kwargs.get('retention_policy') + tags = kwargs.get('tags') + timestamp = kwargs.get('timestamp') + try: + metric_id, index = find_metric(self.get_db, name, tags, rp, add=True) + metric_index = MetricDocument.get(metric_id, index=index, using=self.get_db) + point = Point(fields=values, time=timestamp or datetime.now()) + metric_index.points.append(point) + metric_index.save() + except Exception as exception: + logger.warning(f'got exception while writing to tsdb: {exception}') + raise TimeseriesWriteException + # Elasticsearch automatically refreshes indices every second + # but we can't wait that long especially for tests + self.get_db.indices.refresh(index=name) + + def read(self, key, fields, tags=None, limit=1, order='time', **kwargs): + """ ``since`` should be of the form 'now() - s' """ + extra_fields = kwargs.get('extra_fields') + time_format = kwargs.get('time_format') + since = kwargs.get('since') + try: + metric_id, index = find_metric(self.get_db, key, tags) + except TypeError: + return [] + metric_index = MetricDocument.get(index=index, id=metric_id, using=self.get_db) + # distinguish between traffic and clients + points = [] + for point in list(metric_index.points): + if fields in point.fields.to_dict(): + points.append(point) + if order == 'time': + points = points[0:limit] + elif order == '-time': + points = list(reversed(points))[0:limit] + else: + raise self.client_error( + f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get ' + 'result sorted in ascending /descending order respectively.' + ) + if extra_fields and extra_fields != '*': + assert isinstance(extra_fields, list) + for count, point in enumerate(points): + fields_dict = point.to_dict()['fields'] + point = { + 'time': self._format_time(point['time'], time_format), + fields: fields_dict[fields], + } + for extra_field in extra_fields: + if fields_dict.get(extra_field) is not None: + point.update({extra_field: fields_dict[extra_field]}) + points[count] = point + elif extra_fields == '*': + for count, point in enumerate(points): + points[count] = deep_merge_dicts( + point.fields.to_dict(), + {'time': self._format_time(point.time, time_format)}, + ) + else: + points = [ + deep_merge_dicts( + {fields: p.fields.to_dict()[fields]}, + {'time': self._format_time(p.time, time_format)}, + ) + for p in points + ] + if since: + since = datetime.now().timestamp() - int(re.search(r'\d+', since).group()) + points = [point for point in points if point['time'] >= since] + return points + + def _format_time(self, obj, time_format=None): + """ returns datetime object in isoformat / unix timestamp and UTC timezone """ + if time_format == 'isoformat': + return obj.astimezone(tz=tz('UTC')).isoformat(timespec='seconds') + return int(obj.astimezone(tz=tz('UTC')).timestamp()) + + def get_list_query(self, query, precision='s', key=None): + response = self.query(query, key) + try: + points = response['aggregations']['GroupByTime']['set_range']['time'][ + 'buckets' + ] + list_points = self._fill_points( + query, [self._format(point, precision) for point in points], + ) + list_points.reverse() + except KeyError: + return [] + return list_points + + def _fill_points(self, query, points): + _range = query['aggs']['GroupByTime']['nested']['aggs']['set_range'] + # if not _range or not points: + # return points + days = int(_range['filter']['range']['points.time']['from'][4:-3]) + # return if summary query + try: + interval = _range['aggs']['time']['date_histogram']['fixed_interval'] + except KeyError: + return points + # return if top_fields query + if interval == '365d': + return points + interval_dict = {'10m': 600, '20m': 1200, '1h': 3600, '24h': 86400} + interval = interval_dict[interval] + start_time = datetime.now() + end_time = start_time - timedelta(days=days) # include today + dummy_point = deepcopy(points[0]) + start_ts = points[0]['time'] + interval + end_ts = points[-1]['time'] - interval + for field in dummy_point.keys(): + dummy_point[field] = None if field != 'wifi_clients' else 0 + while start_ts < start_time.timestamp(): + dummy_point['time'] = start_ts + points.insert(0, deepcopy(dummy_point)) + start_ts += interval + # TODO: This is required due to time_zone issues + while points[-1]['time'] < end_time.timestamp(): + points.pop(-1) + while end_ts > end_time.timestamp(): + dummy_point['time'] = end_ts + points.append(deepcopy(dummy_point)) + end_ts -= interval + return points + + def delete_metric_data(self, key=None, tags=None): + """ + deletes a specific metric based on given key and tags; + deletes all metrics if neither provided + """ + if key and tags: + metric_id, index = find_metric(self.get_db, key, tags) + self.get_db.delete(index=index, id=metric_id) + elif key: + self.get_db.indices.delete_alias( + index=f'{key}-*', name=key, ignore=[400, 404] + ) + else: + self.get_db.indices.delete(index='*', ignore=[400, 404]) + # TODO: Speed up tests by retaining indices, almost 50s difference on travis + # try: + # self.get_db.delete_by_query(index='_all', body={'query': {'match_all': {}}}) + # except self.client_error: + # pass + + # Chart related functions below + + def validate_query(self, query): + if not isinstance(query, dict): + raise ValidationError( + {'configuration': f'error parsing query: found {query}'} + ) + # Elasticsearch currently supports validation of only query section, + # aggs, size, _source etc. are not supported + try: + valid_check = self.get_db.indices.validate_query( + body={'query': query['query']}, explain=True + ) + except NotFoundError: + return True + # Show a helpful message for failure + if not valid_check['valid']: + raise ValidationError(valid_check['error']) + return self._is_aggregate(query) + + def _is_aggregate(self, query): + agg_dict = query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs'][ + 'time' + ]['aggs']['nest']['nested']['aggs'].values() + agg = [] + for item in agg_dict: + agg.append(next(iter(item))) + is_aggregate = True if set(agg) <= set(self._AGGREGATE) else False + return is_aggregate if 'size' in query else False + + def get_query( + self, + chart_type, + params, + time, + group_map, + summary=False, + fields=None, + query=None, + timezone=settings.TIME_ZONE, + ): + if not params.get('object_id'): + del query['query'] + query = json.dumps(query) + for k, v in params.items(): + query = query.replace('{' + k + '}', v) + query = self._group_by(query, time, chart_type, group_map, strip=summary) + set_range = query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs'][ + 'time' + ] + if fields: + aggregate_dict = set_range['aggs']['nest']['nested']['aggs'] + agg = deepcopy(aggregate_dict).popitem()[1].popitem()[0] + aggregate_dict.update( + { + f'{field}': {agg: {'field': f'points.fields.{field}'}} + for field in fields + } + ) + try: + set_range['date_histogram']['time_zone'] = timezone + except KeyError: + pass + return query + + def _group_by(self, query, time, chart_type, group_map, strip=False): + query = query.replace('1d/d', f'{time}/d') + if not strip and not chart_type == 'histogram': + value = group_map[time] + query = query.replace('10m', value) + if strip: + query = json.loads(query) + _range = query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs'] + _range['time'].pop('date_histogram') + _range['time']['auto_date_histogram'] = { + 'field': 'points.time', + 'format': 'date_time_no_millis', + 'buckets': 1, + } + if isinstance(query, str): + query = json.loads(query) + return query + + def _get_top_fields( + self, + params, + chart_type, + group_map, + number, + query=None, + timezone=settings.TIME_ZONE, + get_fields=True, + **kwargs, + ): + """ + Returns top fields if ``get_fields`` set to ``True`` (default) + else it returns points containing the top fields. + """ + try: + response = self.get_db.indices.get_mapping(index=params['key']) + except NotFoundError: + return [] + fields = [ + k + for k, v in list(response.values())[0]['mappings']['properties']['points'][ + 'properties' + ]['fields']['properties'].items() + ] + query = self.get_query( + chart_type=chart_type, + params=params, + time='365d', + group_map=group_map, + summary=True, + fields=fields, + query=query, + timezone=timezone, + ) + point = self.get_list_query(query, key=params['key'])[0] + time = point.pop('time') + point = Counter(point).most_common(number) + if get_fields: + return [k for k, v in point] + points = [{'time': time}] + for k, v in point: + points[0][k] = v + return points + + def _format(self, point, precision='s'): + """ allowed values for precision are ``s`` and ``ms`` """ + pt = {} + # By default values returned are in millisecond precision + if precision == 'ms': + pt['time'] = point['key'] / 1000 + else: + pt['time'] = point['key'] // 1000 + for key, value in point.items(): + if isinstance(value, dict): + for k, v in value.items(): + if isinstance(v, dict): + pt[k] = self._transform_field(k, v['value']) + return pt + + def _transform_field(self, field, value): + """ Performs arithmetic operations on the field if required """ + if value is None: + return value + if field in math_map: + op = operator_lookup.get(math_map[field]['operator']) + if op is not None: + value = op(value, math_map[field]['value']) + return value + + def default_chart_query(self, tags=False): + q = deepcopy(default_chart_query) + # This is used to distinguish that it's default query + del q['size'] + if not tags: + q['query']['nested']['query']['bool']['must'] = [] + return q + + def _device_data(self, key, tags, fields, **kwargs): + """ returns last snapshot of ``device_data`` """ + return self.read( + key=key, fields=fields, tags=tags, order='-time', time_format='isoformat', + ) + + +# TODO: +# _fill_points has a while which shouldn't be required diff --git a/openwisp_monitoring/db/backends/elasticsearch/index.py b/openwisp_monitoring/db/backends/elasticsearch/index.py new file mode 100644 index 000000000..ae6b6aad4 --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/index.py @@ -0,0 +1,92 @@ +import uuid + +from django.conf import settings +from elasticsearch.exceptions import NotFoundError +from elasticsearch_dsl import Date, Document, InnerDoc, Nested, Q, Search + + +class Point(InnerDoc): + time = Date(required=True, default_timezone=settings.TIME_ZONE) + fields = Nested(dynamic=True, required=True, multi=True) + + +class MetricDocument(Document): + tags = Nested(dynamic=True, required=False, multi=True) + points = Nested(Point) + + class Index: + name = 'metric' + settings = { + 'number_of_shards': 1, + 'number_of_replicas': 0, + 'lifecycle.name': 'default', + 'lifecycle.rollover_alias': 'metric', + } + + +def find_metric(client, index, tags, retention_policy=None, add=False): + search = Search(using=client, index=index) + if tags: + tags_dict = dict() + for key, value in tags.items(): + tags_dict[f'tags.{key}'] = value + q = Q( + 'nested', + path='tags', + query=Q( + 'bool', must=[Q('match', **{k: str(v)}) for k, v in tags_dict.items()] + ), + ) + else: + q = Q() + try: + result = list(search.query(q).execute())[0].meta + return result['id'], result['index'] + except (NotFoundError, AttributeError, IndexError): + if add: + document = create_document( + client, index, tags, retention_policy=retention_policy + ) + return document['_id'], document['_index'] + return None + + +def create_document(client, key, tags, _id=None, retention_policy=None): + """ + Adds document to relevant index using ``keys``, ``tags`` and ``id`` provided. + If no ``id`` is provided a random ``uuid`` would be used. + """ + _id = str(_id or uuid.uuid1()) + # If index exists, create the document and return + try: + index_aliases = client.indices.get_alias(index=key) + for k, v in index_aliases.items(): + if v['aliases'][key]['is_write_index']: + break + client.create(index=k, id=_id, body={'tags': tags}) + return {'_id': _id, '_index': k} + except NotFoundError: + pass + # Create a new index if it doesn't exist + name = f'{key}-000001' + document = MetricDocument(meta={'id': _id}) + document._index = document._index.clone(name) + # Create a new index template if it doesn't exist + if not client.indices.exists_template(name=key): + document._index.settings(**{'lifecycle.rollover_alias': key}) + if retention_policy: + document._index.settings(**{'lifecycle.name': retention_policy}) + # add index pattern is added for Index Lifecycle Management + document._index.as_template(key, f'{key}-*').save(using=client) + document.init(using=client, index=name) + document.meta.index = name + document.tags = tags + document.save(using=client, index=name) + client.indices.put_alias(index=name, name=key, body={'is_write_index': True}) + if retention_policy: + client.indices.put_settings( + body={'lifecycle.name': retention_policy}, index=name + ) + client.indices.put_settings(body={'lifecycle.rollover_alias': key}, index=name) + client.indices.refresh(index=key) + return document.to_dict(include_meta=True) diff --git a/openwisp_monitoring/db/backends/elasticsearch/queries.py b/openwisp_monitoring/db/backends/elasticsearch/queries.py new file mode 100644 index 000000000..bdae617eb --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/queries.py @@ -0,0 +1,130 @@ +import operator +from copy import deepcopy + +from openwisp_utils.utils import deep_merge_dicts + +from .settings import ADDITIONAL_CHART_OPERATIONS + +default_chart_query = { + 'query': { + 'nested': { + 'path': 'tags', + 'query': { + 'bool': { + 'must': [ + {'match': {'tags.object_id': {'query': '{object_id}'}}}, + {'match': {'tags.content_type': {'query': '{content_type}'}}}, + ] + } + }, + }, + }, + '_source': False, + 'size': 0, + 'aggs': { + 'GroupByTime': { + 'nested': { + 'path': 'points', + 'aggs': { + 'set_range': { + 'filter': { + 'range': { + 'points.time': {'from': 'now-1d/d', 'to': 'now/d'} + } + }, + 'aggs': { + 'time': { + 'date_histogram': { + 'field': 'points.time', + 'fixed_interval': '10m', + 'format': 'date_time_no_millis', + 'order': {'_key': 'desc'}, + }, + 'aggs': { + 'nest': { + 'nested': { + 'path': 'points.fields', + 'aggs': { + '{field_name}': { + 'avg': { + 'field': 'points.fields.{field_name}' + } + } + }, + } + }, + }, + }, + }, + } + }, + } + } + }, +} + +math_map = { + 'uptime': {'operator': '*', 'value': 100}, + 'memory_usage': {'operator': '*', 'value': 100}, + 'CPU_load': {'operator': '*', 'value': 100}, + 'disk_usage': {'operator': '*', 'value': 100}, + 'upload': {'operator': '/', 'value': 1000000000}, + 'download': {'operator': '/', 'value': 1000000000}, +} + +operator_lookup = { + '+': operator.add, + '-': operator.sub, + '*': operator.mul, + '/': operator.truediv, +} + +if ADDITIONAL_CHART_OPERATIONS: + assert isinstance(ADDITIONAL_CHART_OPERATIONS, dict) + for value in ADDITIONAL_CHART_OPERATIONS.values(): + assert value['operator'] in operator_lookup + assert isinstance(value['value'], (int, float)) + math_map = deep_merge_dicts(math_map, ADDITIONAL_CHART_OPERATIONS) + + +def _make_query(aggregation=None): + query = deepcopy(default_chart_query) + if aggregation: + query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs']['time'][ + 'aggs' + ]['nest']['nested']['aggs'] = aggregation + return query + + +def _get_chart_query(): + aggregation_dict = { + 'uptime': {'uptime': {'avg': {'field': 'points.fields.reachable'}}}, + 'packet_loss': {'packet_loss': {'avg': {'field': 'points.fields.loss'}}}, + 'rtt': { + 'RTT_average': {'avg': {'field': 'points.fields.rtt_avg'}}, + 'RTT_max': {'avg': {'field': 'points.fields.rtt_max'}}, + 'RTT_min': {'avg': {'field': 'points.fields.rtt_min'}}, + }, + 'traffic': { + 'upload': {'sum': {'field': 'points.fields.tx_bytes'}}, + 'download': {'sum': {'field': 'points.fields.rx_bytes'}}, + }, + 'wifi_clients': { + 'wifi_clients': { + 'cardinality': { + 'field': 'points.fields.{field_name}.keyword', + 'missing': 0, + } + } + }, + 'memory': {'memory_usage': {'avg': {'field': 'points.fields.percent_used'}}}, + 'cpu': {'CPU_load': {'avg': {'field': 'points.fields.cpu_usage'}}}, + 'disk': {'disk_usage': {'avg': {'field': 'points.fields.used_disk'}}}, + } + query = {} + for key, value in aggregation_dict.items(): + query[key] = {'elasticsearch': _make_query(value)} + return query + + +chart_query = _get_chart_query() diff --git a/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py b/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py new file mode 100644 index 000000000..d4269959e --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py @@ -0,0 +1,41 @@ +# By default age is calculated from the date the index is created but if the +# index has been rolled over than the rollover date is used to calculate the age + +default_rp_policy = { + 'policy': { + 'phases': { + 'hot': { + 'actions': { + 'rollover': {'max_age': '30d', 'max_size': '90G'}, + 'set_priority': {'priority': 100}, + } + }, + 'warm': { + 'min_age': '30d', + 'actions': { + 'forcemerge': {'max_num_segments': 1}, + 'allocate': {'number_of_replicas': 0}, + 'set_priority': {'priority': 50}, + }, + }, + 'cold': {'min_age': '150d', 'actions': {'freeze': {}}}, + 'delete': {'min_age': '335d', 'actions': {'delete': {}}}, + } + } +} + + +def _make_policy(max_age): + return { + 'policy': { + 'phases': { + 'hot': { + 'actions': { + 'rollover': {'max_age': max_age}, + 'set_priority': {'priority': 100}, + } + }, + 'delete': {'actions': {'delete': {}}}, + } + } + } diff --git a/openwisp_monitoring/db/backends/elasticsearch/settings.py b/openwisp_monitoring/db/backends/elasticsearch/settings.py new file mode 100644 index 000000000..8bb6f1159 --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/settings.py @@ -0,0 +1,5 @@ +from django.conf import settings + +ADDITIONAL_CHART_OPERATIONS = getattr( + settings, 'OPENWISP_MONITORING_ADDITIONAL_CHART_OPERATIONS', {} +) diff --git a/openwisp_monitoring/db/utils.py b/openwisp_monitoring/db/backends/elasticsearch/tests/__init__.py similarity index 100% rename from openwisp_monitoring/db/utils.py rename to openwisp_monitoring/db/backends/elasticsearch/tests/__init__.py diff --git a/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py b/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py new file mode 100644 index 000000000..0f4f72f7c --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py @@ -0,0 +1,296 @@ +from datetime import datetime, timedelta +from importlib import reload +from unittest.mock import patch + +from celery.exceptions import Retry +from django.conf import settings +from django.core.exceptions import ValidationError +from django.utils.timezone import now +from elasticsearch.exceptions import ElasticsearchException +from freezegun import freeze_time +from pytz import timezone as tz + +from openwisp_monitoring.device.settings import SHORT_RETENTION_POLICY +from openwisp_monitoring.device.tests import DeviceMonitoringTestCase +from openwisp_monitoring.device.utils import SHORT_RP, manage_short_retention_policy + +from ....exceptions import TimeseriesWriteException +from ... import timeseries_db +from .. import queries as queries_module +from ..index import MetricDocument + + +class TestDatabaseClient(DeviceMonitoringTestCase): + def test_get_query_fields_function(self): + c = self._create_chart(test_data=None, configuration='histogram') + q = c.get_query(fields=['ssh', 'http2', 'apple-music']) + self.assertIn("'ssh': {'sum': {'field': 'points.fields.ssh'}}", str(q)) + self.assertIn("'http2': {'sum': {'field': 'points.fields.http2'}}", str(q)) + self.assertIn( + "'apple-music': {'sum': {'field': 'points.fields.apple-music'}}", str(q) + ) + + def test_default_query(self): + c = self._create_chart(test_data=False) + q = timeseries_db.default_chart_query(tags=True) + self.assertEqual(c.query, q) + + def test_write(self): + timeseries_db.write('test_write', dict(value=2)) + measurement = timeseries_db.read(key='test_write', fields='value')[0] + self.assertEqual(measurement['value'], 2) + + def test_general_write(self): + m = self._create_general_metric(name='Sync test') + m.write(1) + measurement = timeseries_db.read(key='sync_test', fields='value')[0] + self.assertEqual(measurement['value'], 1) + + def test_object_write(self): + om = self._create_object_metric() + om.write(3) + measurement = timeseries_db.read( + key='test_metric', fields='value', tags=om.tags + )[0] + self.assertEqual(measurement['value'], 3) + + def test_general_same_key_different_fields(self): + down = self._create_general_metric( + name='traffic (download)', key='traffic', field_name='download' + ) + down.write(200) + up = self._create_general_metric( + name='traffic (upload)', key='traffic', field_name='upload' + ) + up.write(100) + measurement = timeseries_db.read(key='traffic', fields='download')[0] + self.assertEqual(measurement['download'], 200) + measurement = timeseries_db.read(key='traffic', fields='upload')[0] + self.assertEqual(measurement['upload'], 100) + + def test_object_same_key_different_fields(self): + user = self._create_user() + user_down = self._create_object_metric( + name='traffic (download)', + key='traffic', + field_name='download', + content_object=user, + ) + user_down.write(200) + user_up = self._create_object_metric( + name='traffic (upload)', + key='traffic', + field_name='upload', + content_object=user, + ) + user_up.write(100) + measurement = timeseries_db.read( + key='traffic', fields='download', tags=user_down.tags + )[0] + self.assertEqual(measurement['download'], 200) + measurement = timeseries_db.read( + key='traffic', fields='upload', tags=user_up.tags + )[0] + self.assertEqual(measurement['upload'], 100) + + def test_get_query_1d(self): + c = self._create_chart(test_data=None, configuration='uptime') + q = c.get_query(time='1d') + time_map = c.GROUP_MAP['1d'] + self.assertIn( + "{'range': {'points.time': {'from': 'now-1d/d', 'to': 'now/d'}}}", str(q) + ) + self.assertIn(f"'fixed_interval': '{time_map}'", str(q)) + + def test_get_query_30d(self): + c = self._create_chart(test_data=None, configuration='uptime') + q = c.get_query(time='30d') + time_map = c.GROUP_MAP['30d'] + self.assertIn( + "{'range': {'points.time': {'from': 'now-30d/d', 'to': 'now/d'}}}", str(q) + ) + self.assertIn(f"'fixed_interval': '{time_map}'", str(q)) + + def test_retention_policy(self): + manage_short_retention_policy() + rp = timeseries_db.get_list_retention_policies() + assert 'default' in rp + assert SHORT_RP in rp + days = f'{int(SHORT_RETENTION_POLICY.split("h")[0]) // 24}d' + self.assertEqual( + rp['short']['policy']['phases']['hot']['actions']['rollover']['max_age'], + days, + ) + + def test_get_query(self): + c = self._create_chart(test_data=False) + m = c.metric + params = dict( + field_name=m.field_name, + key=m.key, + content_type=m.content_type_key, + object_id=m.object_id, + time=c.DEFAULT_TIME, + ) + expected = timeseries_db.get_query( + c.type, + params, + c.DEFAULT_TIME, + c.GROUP_MAP, + query=c.query, + timezone=settings.TIME_ZONE, + ) + self.assertEqual(c.get_query(), expected) + + def test_query_no_index(self): + timeseries_db.delete_metric_data(key='ping') + c = self._create_chart(test_data=False) + q = c.get_query() + self.assertEqual(timeseries_db.query(q, index='ping'), {}) + self.assertEqual(timeseries_db.get_list_query(q), []) + + def test_1d_chart_data(self): + c = self._create_chart() + data = c.read(time='1d') + self.assertIn('x', data) + self.assertEqual(len(data['x']), 144) + self.assertIn('traces', data) + self.assertEqual(9.0, data['traces'][0][1][-1]) + # Test chart with old data has same length + m = self._create_general_metric(name='dummy') + c = self._create_chart(metric=m, test_data=False) + m.write(6.0, time=now() - timedelta(hours=23)) + data = c.read(time='1d') + self.assertIn('x', data) + self.assertEqual(len(data['x']), 144) + self.assertIn('traces', data) + self.assertIn(6.0, data['traces'][0][1]) + + def test_delete_metric_data(self): + obj = self._create_user() + om = self._create_object_metric(name='Logins', content_object=obj) + om.write(100) + self.assertEqual(om.read()[0]['value'], 100) + timeseries_db.delete_metric_data(key=om.key, tags=om.tags) + + def test_invalid_query(self): + q = timeseries_db.default_chart_query() + q['query']['nested']['query']['must'] = 'invalid' + try: + timeseries_db.validate_query(q) + except ValidationError as e: + self.assertIn('ParsingException: [bool] malformed query', str(e)) + + def test_non_aggregation_query(self): + q = {'query': timeseries_db.default_chart_query()['query']} + self.assertEqual(timeseries_db.get_list_query(q), []) + + def test_timestamp_precision(self): + c = self._create_chart() + points = timeseries_db.get_list_query(c.get_query(), precision='ms') + self.assertIsInstance(points[0]['time'], float) + points = timeseries_db.get_list_query(c.get_query(), precision='s') + self.assertIsInstance(points[0]['time'], int) + + def create_docs_single_index(self): + m = self._create_object_metric(name='dummy') + m.write(1) + d = self._create_device(organization=self._create_org()) + m2 = self._create_object_metric(name='dummy', content_object=d) + m2.write(1) + self.assertEqual(len(timeseries_db.get_db.indices.get_alias(name='dummy')), 1) + + def test_additional_chart_operations_setting(self): + modify_operators = { + 'upload': {'operator': '/', 'value': 1000000}, + 'download': {'operator': '/', 'value': 1000000}, + } + path = 'openwisp_monitoring.db.backends.elasticsearch.queries.ADDITIONAL_CHART_OPERATIONS' + with patch.dict(path, modify_operators, clear=True): + queries = reload(queries_module) + self.assertEqual(queries.ADDITIONAL_CHART_OPERATIONS, modify_operators) + self.assertEqual(queries.math_map['upload'], modify_operators['upload']) + self.assertEqual(queries.math_map['download'], modify_operators['download']) + + def test_read(self): + c = self._create_chart() + data = c.read() + key = c.metric.field_name + self.assertIn('x', data) + self.assertIn('traces', data) + self.assertEqual(len(data['x']), 168) + charts = data['traces'] + self.assertEqual(charts[0][0], key) + self.assertEqual(len(charts[0][1]), 168) + self.assertTrue(all(elem in charts[0][1] for elem in [3, 6, 9])) + + def test_read_multiple(self): + c = self._create_chart(test_data=None, configuration='multiple_test') + m1 = c.metric + m2 = self._create_object_metric( + name='test metric 2', + key='test_metric', + field_name='value2', + content_object=m1.content_object, + ) + now_ = now() + for n in range(0, 3): + time = now_ - timedelta(days=n) + m1.write(n + 1, time=time) + m2.write(n + 2, time=time) + data = c.read() + f1 = m1.field_name + f2 = 'value2' + self.assertIn('x', data) + self.assertIn('traces', data) + self.assertEqual(len(data['x']), 168) + charts = data['traces'] + self.assertIn(f1, charts[0][0]) + self.assertIn(f2, charts[1][0]) + self.assertEqual(len(charts[0][1]), 168) + self.assertEqual(len(charts[1][1]), 168) + self.assertTrue(all(elem in charts[0][1] for elem in [3, 2, 1])) + self.assertTrue(all(elem in charts[1][1] for elem in [4, 3, 2])) + + def test_ilm_disabled(self): + with patch.object(timeseries_db, 'ilm_enabled', False): + self.assertFalse(timeseries_db.ilm_enabled) + self.assertIsNone( + timeseries_db.create_or_alter_retention_policy(name='default') + ) + self.assertIsNone(timeseries_db.get_list_retention_policies()) + + @patch.object(MetricDocument, 'get', side_effect=ElasticsearchException) + def test_write_retry(self, mock_write): + with self.assertRaises(TimeseriesWriteException): + timeseries_db.write('test_write', {'value': 1}) + m = self._create_general_metric(name='Test metric') + with self.assertRaises(Retry): + m.write(1) + + @patch.object(MetricDocument, 'get', side_effect=ElasticsearchException) + def test_timeseries_write_params(self, mock_write): + with freeze_time('Jan 14th, 2020') as frozen_datetime: + m = self._create_general_metric(name='Test metric') + with self.assertRaises(Retry) as e: + m.write(1) + frozen_datetime.tick(delta=timedelta(minutes=10)) + self.assertEqual( + now(), datetime(2020, 1, 14, tzinfo=tz('UTC')) + timedelta(minutes=10) + ) + task_signature = e.exception.sig + with patch.object(timeseries_db, 'write') as mock_write: + self._retry_task(task_signature) + mock_write.assert_called_with( + 'test_metric', + {'value': 1}, + database=None, + retention_policy=None, + tags={}, + # this should be the original time at the moment of first failure + timestamp='2020-01-14T00:00:00Z', + ) + + def _retry_task(self, task_signature): + task_kwargs = task_signature.kwargs + task_signature.type.run(**task_kwargs) diff --git a/openwisp_monitoring/db/backends/influxdb/client.py b/openwisp_monitoring/db/backends/influxdb/client.py index ab443056a..ae67d37da 100644 --- a/openwisp_monitoring/db/backends/influxdb/client.py +++ b/openwisp_monitoring/db/backends/influxdb/client.py @@ -52,7 +52,6 @@ class DatabaseClient(object): backend_name = 'influxdb' def __init__(self, db_name=None): - self._db = None self.db_name = db_name or TIMESERIES_DB['NAME'] self.client_error = InfluxDBClientError @@ -164,7 +163,7 @@ def read(self, key, fields, tags, **kwargs): q = f'{q} LIMIT {limit}' return list(self.query(q, precision='s').get_points()) - def get_list_query(self, query, precision='s'): + def get_list_query(self, query, precision='s', **kwargs): return list(self.query(query, precision=precision).get_points()) def get_list_retention_policies(self): @@ -269,6 +268,7 @@ def __transform_field(self, field, function, operation=None): def _get_top_fields( self, + default_query, query, params, chart_type, @@ -276,9 +276,15 @@ def _get_top_fields( number, time, timezone=settings.TIME_ZONE, + get_fields=True, ): + """ + Returns top fields if ``get_fields`` set to ``True`` (default) + else it returns points containing the top fields. + """ + q = default_query.replace('{field_name}', '{fields}') q = self.get_query( - query=query, + query=q, params=params, chart_type=chart_type, group_map=group_map, @@ -287,7 +293,7 @@ def _get_top_fields( time=time, timezone=timezone, ) - res = list(self.query(q, precision='s').get_points()) + res = self.get_list_query(q) if not res: return [] res = res[0] @@ -297,4 +303,31 @@ def _get_top_fields( keys = list(sorted_dict.keys()) keys.reverse() top = keys[0:number] - return [item.replace('sum_', '') for item in top] + top_fields = [item.replace('sum_', '') for item in top] + if get_fields: + return top_fields + query = self.get_query( + query=query, + params=params, + chart_type=chart_type, + group_map=group_map, + summary=True, + fields=top_fields, + time=time, + timezone=timezone, + ) + return self.get_list_query(query) + + def default_chart_query(self, tags): + q = "SELECT {field_name} FROM {key} WHERE time >= '{time}'" + if tags: + q += " AND content_type = '{content_type}' AND object_id = '{object_id}'" + return q + + def _device_data(self, key, tags, rp, **kwargs): + """ returns last snapshot of ``device_data`` """ + query = ( + f"SELECT data FROM {rp}.{key} WHERE pk = '{tags['pk']}' " + "ORDER BY time DESC LIMIT 1" + ) + return self.get_list_query(query, precision=None) diff --git a/openwisp_monitoring/db/backends/influxdb/queries.py b/openwisp_monitoring/db/backends/influxdb/queries.py index 1ebc07412..47523cb6e 100644 --- a/openwisp_monitoring/db/backends/influxdb/queries.py +++ b/openwisp_monitoring/db/backends/influxdb/queries.py @@ -58,12 +58,3 @@ ) }, } - -default_chart_query = [ - "SELECT {field_name} FROM {key} WHERE time >= '{time}'", - " AND content_type = '{content_type}' AND object_id = '{object_id}'", -] - -device_data_query = ( - "SELECT data FROM {0}.{1} WHERE pk = '{2}' " "ORDER BY time DESC LIMIT 1" -) diff --git a/openwisp_monitoring/db/backends/influxdb/tests/__init__.py b/openwisp_monitoring/db/backends/influxdb/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/openwisp_monitoring/db/backends/influxdb/tests.py b/openwisp_monitoring/db/backends/influxdb/tests/client_tests.py similarity index 80% rename from openwisp_monitoring/db/backends/influxdb/tests.py rename to openwisp_monitoring/db/backends/influxdb/tests/client_tests.py index bea86bdab..d22b8e446 100644 --- a/openwisp_monitoring/db/backends/influxdb/tests.py +++ b/openwisp_monitoring/db/backends/influxdb/tests/client_tests.py @@ -1,7 +1,8 @@ -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta from unittest.mock import patch from celery.exceptions import Retry +from django.conf import settings from django.core.exceptions import ValidationError from django.test import TestCase from django.utils.timezone import now @@ -15,8 +16,8 @@ from openwisp_monitoring.device.utils import SHORT_RP, manage_short_retention_policy from openwisp_monitoring.monitoring.tests import TestMonitoringMixin -from ...exceptions import TimeseriesWriteException -from .. import timeseries_db +from ....exceptions import TimeseriesWriteException +from ... import timeseries_db Chart = load_model('monitoring', 'Chart') Notification = load_model('openwisp_notifications', 'Notification') @@ -46,16 +47,6 @@ def test_get_custom_query(self): q = c.get_query(query=custom_q, fields=['SUM(*)']) self.assertIn('SELECT SUM(*) FROM', q) - def test_is_aggregate_bug(self): - m = self._create_object_metric(name='summary_avg') - c = Chart(metric=m, configuration='dummy') - self.assertFalse(timeseries_db._is_aggregate(c.query)) - - def test_is_aggregate_fields_function(self): - m = self._create_object_metric(name='is_aggregate_func') - c = Chart(metric=m, configuration='uptime') - self.assertTrue(timeseries_db._is_aggregate(c.query)) - def test_get_query_fields_function(self): c = self._create_chart(test_data=None, configuration='histogram') q = c.get_query(fields=['ssh', 'http2', 'apple-music']) @@ -150,21 +141,6 @@ def test_object_same_key_different_fields(self): measurement = timeseries_db.get_list_query(q)[0] self.assertEqual(measurement['upload'], 100) - def test_delete_metric_data(self): - m = self._create_general_metric(name='test_metric') - m.write(100) - self.assertEqual(m.read()[0]['value'], 100) - timeseries_db.delete_metric_data(key=m.key) - self.assertEqual(m.read(), []) - om = self._create_object_metric(name='dummy') - om.write(50) - m.write(100) - self.assertEqual(m.read()[0]['value'], 100) - self.assertEqual(om.read()[0]['value'], 50) - timeseries_db.delete_metric_data() - self.assertEqual(m.read(), []) - self.assertEqual(om.read(), []) - def test_get_query_1d(self): c = self._create_chart(test_data=None, configuration='uptime') q = c.get_query(time='1d') @@ -197,27 +173,11 @@ def test_query_set(self): ) self.assertEqual(c.query, expected) self.assertEqual( - ''.join(timeseries_db.queries.default_chart_query[0:2]), c._default_query + ''.join(timeseries_db.default_chart_query(tags=c.metric.tags)), + c._default_query, ) c.metric.object_id = None - self.assertEqual(timeseries_db.queries.default_chart_query[0], c._default_query) - - def test_read_order(self): - m = self._create_general_metric(name='dummy') - m.write(30) - m.write(40, time=now() - timedelta(days=2)) - with self.subTest('Test ascending read order'): - metric_data = m.read(limit=2, order='time') - self.assertEqual(metric_data[0]['value'], 40) - self.assertEqual(metric_data[1]['value'], 30) - with self.subTest('Test descending read order'): - metric_data = m.read(limit=2, order='-time') - self.assertEqual(metric_data[0]['value'], 30) - self.assertEqual(metric_data[1]['value'], 40) - with self.subTest('Test invalid read order'): - with self.assertRaises(timeseries_db.client_error) as e: - metric_data = m.read(limit=2, order='invalid') - self.assertIn('Invalid order "invalid" passed.', str(e)) + self.assertEqual(timeseries_db.default_chart_query(tags=None), c._default_query) def test_read_with_rp(self): self._create_admin() @@ -293,3 +253,59 @@ def test_timeseries_write_params(self, mock_write): def _retry_task(self, task_signature): task_kwargs = task_signature.kwargs task_signature.type.run(**task_kwargs) + + def test_get_query(self): + c = self._create_chart(test_data=False) + m = c.metric + now_ = now() + today = date(now_.year, now_.month, now_.day) + time = today - timedelta(days=6) + expected = c.query.format( + field_name=m.field_name, + key=m.key, + content_type=m.content_type_key, + object_id=m.object_id, + time=str(time), + ) + expected = "{0} tz('{1}')".format(expected, settings.TIME_ZONE) + self.assertEqual(c.get_query(), expected) + + def test_read(self): + c = self._create_chart() + data = c.read() + key = c.metric.field_name + self.assertIn('x', data) + self.assertIn('traces', data) + self.assertEqual(len(data['x']), 3) + charts = data['traces'] + self.assertEqual(charts[0][0], key) + self.assertEqual(len(charts[0][1]), 3) + self.assertEqual(charts[0][1], [3, 6, 9]) + + def test_read_multiple(self): + c = self._create_chart(test_data=None, configuration='multiple_test') + m1 = c.metric + m2 = self._create_object_metric( + name='test metric 2', + key='test_metric', + field_name='value2', + content_object=m1.content_object, + ) + now_ = now() + for n in range(0, 3): + time = now_ - timedelta(days=n) + m1.write(n + 1, time=time) + m2.write(n + 2, time=time) + data = c.read() + f1 = m1.field_name + f2 = 'value2' + self.assertIn('x', data) + self.assertIn('traces', data) + self.assertEqual(len(data['x']), 3) + charts = data['traces'] + self.assertIn(f1, charts[0][0]) + self.assertIn(f2, charts[1][0]) + self.assertEqual(len(charts[0][1]), 3) + self.assertEqual(len(charts[1][1]), 3) + self.assertEqual(charts[0][1], [3, 2, 1]) + self.assertEqual(charts[1][1], [4, 3, 2]) diff --git a/openwisp_monitoring/db/tests.py b/openwisp_monitoring/db/tests.py new file mode 100644 index 000000000..c0103c869 --- /dev/null +++ b/openwisp_monitoring/db/tests.py @@ -0,0 +1,55 @@ +from datetime import timedelta + +from django.utils.timezone import now +from swapper import load_model + +from . import timeseries_db +from .backends import load_backend_module + +Chart = load_model('monitoring', 'Chart') +tests = load_backend_module(module='tests.client_tests') + + +class TestDatabaseClient(tests.TestDatabaseClient): + def test_is_aggregate_bug(self): + m = self._create_object_metric(name='summary_avg') + c = Chart(metric=m, configuration='dummy') + self.assertFalse(timeseries_db._is_aggregate(c.query)) + + def test_is_aggregate_fields_function(self): + m = self._create_object_metric(name='is_aggregate_func') + c = Chart(metric=m, configuration='uptime') + self.assertTrue(timeseries_db._is_aggregate(c.query)) + + def test_delete_metric_data(self): + m = self._create_general_metric(name='test_metric') + m.write(100) + self.assertEqual(m.read()[0]['value'], 100) + timeseries_db.delete_metric_data(key=m.key) + self.assertEqual(m.read(), []) + om = self._create_object_metric(name='dummy') + om.write(50) + m.write(100) + self.assertEqual(m.read()[0]['value'], 100) + self.assertEqual(om.read()[0]['value'], 50) + timeseries_db.delete_metric_data() + self.assertEqual(m.read(), []) + self.assertEqual(om.read(), []) + + def test_read_order(self): + timeseries_db.delete_metric_data() + m = self._create_general_metric(name='dummy') + m.write(40, time=now() - timedelta(days=2)) + m.write(30) + with self.subTest('Test ascending read order'): + metric_data = m.read(limit=2, order='time') + self.assertEqual(metric_data[0]['value'], 40) + self.assertEqual(metric_data[1]['value'], 30) + with self.subTest('Test descending read order'): + metric_data = m.read(limit=2, order='-time') + self.assertEqual(metric_data[0]['value'], 30) + self.assertEqual(metric_data[1]['value'], 40) + with self.subTest('Test invalid read order'): + with self.assertRaises(timeseries_db.client_error) as e: + metric_data = m.read(limit=2, order='invalid') + self.assertIn('Invalid order "invalid" passed.', str(e)) diff --git a/openwisp_monitoring/device/base/models.py b/openwisp_monitoring/device/base/models.py index 8b7ff22e9..385400064 100644 --- a/openwisp_monitoring/device/base/models.py +++ b/openwisp_monitoring/device/base/models.py @@ -22,7 +22,7 @@ from openwisp_utils.base import TimeStampedEditableModel -from ...db import device_data_query, timeseries_db +from ...db import timeseries_db from ...monitoring.signals import threshold_crossed from ...monitoring.tasks import timeseries_write from .. import settings as app_settings @@ -104,11 +104,12 @@ def data(self): """ if self.__data: return self.__data - q = device_data_query.format(SHORT_RP, self.__key, self.pk) cache_key = get_device_cache_key(device=self, context='current-data') points = cache.get(cache_key) if not points: - points = timeseries_db.get_list_query(q, precision=None) + points = timeseries_db._device_data( + rp=SHORT_RP, tags={'pk': self.pk}, key=self.__key, fields='data' + ) if not points: return None self.data_timestamp = points[0]['time'] diff --git a/openwisp_monitoring/device/tests/test_api.py b/openwisp_monitoring/device/tests/test_api.py index b248f75b4..08e40d7c4 100644 --- a/openwisp_monitoring/device/tests/test_api.py +++ b/openwisp_monitoring/device/tests/test_api.py @@ -289,10 +289,10 @@ def test_get_device_metrics_csv(self): '2', '0.4', '0.1', - '2.0', - '1.0', + '2', + '1', '9.73', - '0.0', + '0', '8.27', ], ) diff --git a/openwisp_monitoring/monitoring/base/models.py b/openwisp_monitoring/monitoring/base/models.py index 6f92ff24f..abfcdd59f 100644 --- a/openwisp_monitoring/monitoring/base/models.py +++ b/openwisp_monitoring/monitoring/base/models.py @@ -20,7 +20,7 @@ from openwisp_utils.base import TimeStampedEditableModel -from ...db import default_chart_query, timeseries_db +from ...db import timeseries_db from ..configuration import ( CHART_CONFIGURATION_CHOICES, DEFAULT_COLORS, @@ -338,10 +338,8 @@ def top_fields(self): @property def _default_query(self): - q = default_chart_query[0] - if self.metric.object_id: - q += default_chart_query[1] - return q + tags = True if self.metric.object_id else False + return timeseries_db.default_chart_query(tags) def get_query( self, @@ -362,10 +360,10 @@ def get_top_fields(self, number): Returns list of top ``number`` of fields (highest sum) of a measurement in the specified time range (descending order). """ - q = self._default_query.replace('{field_name}', '{fields}') params = self._get_query_params(self.DEFAULT_TIME) return timeseries_db._get_top_fields( - query=q, + default_query=self._default_query, + query=self.get_query(), chart_type=self.type, group_map=self.GROUP_MAP, number=number, @@ -410,16 +408,23 @@ def read( try: query_kwargs = dict(time=time, timezone=timezone) if self.top_fields: - fields = self.get_top_fields(self.top_fields) - data_query = self.get_query(fields=fields, **query_kwargs) - summary_query = self.get_query( - fields=fields, summary=True, **query_kwargs + points = summary = timeseries_db._get_top_fields( + default_query=self._default_query, + chart_type=self.type, + group_map=self.GROUP_MAP, + number=self.top_fields, + params=self._get_query_params(self.DEFAULT_TIME), + time=time, + query=self.query, + get_fields=False, ) else: data_query = self.get_query(**query_kwargs) summary_query = self.get_query(summary=True, **query_kwargs) - points = timeseries_db.get_list_query(data_query) - summary = timeseries_db.get_list_query(summary_query) + points = timeseries_db.get_list_query(data_query, key=self.metric.key) + summary = timeseries_db.get_list_query( + summary_query, key=self.metric.key + ) except timeseries_db.client_error as e: logging.error(e, exc_info=True) raise e @@ -471,7 +476,11 @@ def _round(value, decimal_places): control = 1.0 / 10 ** decimal_places if value < control: decimal_places += 2 - return round(value, decimal_places) + value = round(value, decimal_places) + # added for Elasticsearch division outputs (traffic metric) + if isinstance(value, float) and value.is_integer(): + value = int(value) + return value class AbstractAlertSettings(TimeStampedEditableModel): diff --git a/openwisp_monitoring/monitoring/tests/__init__.py b/openwisp_monitoring/monitoring/tests/__init__.py index ac943a8ca..9803a19c8 100644 --- a/openwisp_monitoring/monitoring/tests/__init__.py +++ b/openwisp_monitoring/monitoring/tests/__init__.py @@ -7,6 +7,7 @@ from ...db import timeseries_db from ...db.backends import TIMESERIES_DB +from ...db.backends.elasticsearch.queries import _make_query from ..configuration import ( register_chart, register_metric, @@ -80,7 +81,10 @@ "SELECT {fields|SUM|/ 1} FROM {key} " "WHERE time >= '{time}' AND content_type = " "'{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query( + {'{field_name}': {'sum': {'field': 'points.fields.{field_name}'}}} + ), }, }, 'dummy': { @@ -97,7 +101,7 @@ 'description': 'Bugged chart for testing purposes.', 'unit': 'bugs', 'order': 999, - 'query': {'influxdb': "BAD"}, + 'query': {'influxdb': "BAD", 'elasticsearch': "BAD"}, }, 'default': { 'type': 'line', @@ -109,7 +113,8 @@ 'influxdb': ( "SELECT {field_name} FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query(), }, }, 'multiple_test': { @@ -122,7 +127,13 @@ 'influxdb': ( "SELECT {field_name}, value2 FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query( + { + '{field_name}': {'sum': {'field': 'points.fields.{field_name}'}}, + 'value2': {'sum': {'field': 'points.fields.value2'}}, + } + ), }, }, 'mean_test': { @@ -135,7 +146,8 @@ 'influxdb': ( "SELECT MEAN({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query(), }, }, 'sum_test': { @@ -148,7 +160,10 @@ 'influxdb': ( "SELECT SUM({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query( + {'{field_name}': {'sum': {'field': 'points.fields.{field_name}'}}} + ), }, }, 'top_fields_mean': { @@ -162,7 +177,8 @@ "SELECT {fields|MEAN} FROM {key} " "WHERE time >= '{time}' AND content_type = " "'{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query(), }, }, } diff --git a/openwisp_monitoring/monitoring/tests/test_charts.py b/openwisp_monitoring/monitoring/tests/test_charts.py index 5adf6f33e..704e9bc36 100644 --- a/openwisp_monitoring/monitoring/tests/test_charts.py +++ b/openwisp_monitoring/monitoring/tests/test_charts.py @@ -2,7 +2,6 @@ from datetime import date, timedelta from unittest.mock import patch -from django.conf import settings from django.core.exceptions import ImproperlyConfigured, ValidationError from django.test import TestCase from django.utils.timezone import now @@ -25,18 +24,6 @@ class TestCharts(TestMonitoringMixin, TestCase): Tests for functionalities related to charts """ - def test_read(self): - c = self._create_chart() - data = c.read() - key = c.metric.field_name - self.assertIn('x', data) - self.assertIn('traces', data) - self.assertEqual(len(data['x']), 3) - charts = data['traces'] - self.assertEqual(charts[0][0], key) - self.assertEqual(len(charts[0][1]), 3) - self.assertEqual(charts[0][1], [3, 6, 9]) - def test_read_summary_avg(self): m = self._create_object_metric(name='summary_avg') c = self._create_chart(metric=m, test_data=False, configuration='mean_test') @@ -133,34 +120,6 @@ def test_read_summary_top_fields_acid(self): self.assertEqual(data['summary'], {'google': 87500000, 'facebook': 37503000}) self.assertEqual(c.get_top_fields(2), ['google', 'facebook']) - def test_read_multiple(self): - c = self._create_chart(test_data=None, configuration='multiple_test') - m1 = c.metric - m2 = self._create_object_metric( - name='test metric 2', - key='test_metric', - field_name='value2', - content_object=m1.content_object, - ) - now_ = now() - for n in range(0, 3): - time = now_ - timedelta(days=n) - m1.write(n + 1, time=time) - m2.write(n + 2, time=time) - data = c.read() - f1 = m1.field_name - f2 = 'value2' - self.assertIn('x', data) - self.assertIn('traces', data) - self.assertEqual(len(data['x']), 3) - charts = data['traces'] - self.assertIn(f1, charts[0][0]) - self.assertIn(f2, charts[1][0]) - self.assertEqual(len(charts[0][1]), 3) - self.assertEqual(len(charts[1][1]), 3) - self.assertEqual(charts[0][1], [3, 2, 1]) - self.assertEqual(charts[1][1], [4, 3, 2]) - def test_json(self): c = self._create_chart() data = c.read() @@ -180,22 +139,6 @@ def test_read_bad_query(self): else: self.fail('ValidationError not raised') - def test_get_query(self): - c = self._create_chart(test_data=False) - m = c.metric - now_ = now() - today = date(now_.year, now_.month, now_.day) - time = today - timedelta(days=6) - expected = c.query.format( - field_name=m.field_name, - key=m.key, - content_type=m.content_type_key, - object_id=m.object_id, - time=str(time), - ) - expected = "{0} tz('{1}')".format(expected, settings.TIME_ZONE) - self.assertEqual(c.get_query(), expected) - def test_description(self): c = self._create_chart(test_data=False) self.assertEqual(c.description, 'Dummy chart for testing purposes.') diff --git a/requirements.txt b/requirements.txt index bbfb664a0..158898a6b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ openwisp-controller @ https://github.com/openwisp/openwisp-controller/tarball/ma # is the one dictated by openwisp-controller influxdb~=5.3 django-celery-email~=3.0.0 +mac-vendor-lookup~=0.1 django-cache-memoize~=0.1 django-nested-admin~=3.3.2 swapper~=1.1 diff --git a/setup.py b/setup.py index 43ca4bb97..b2a629f7f 100755 --- a/setup.py +++ b/setup.py @@ -55,6 +55,10 @@ def get_install_requires(): include_package_data=True, zip_safe=False, install_requires=get_install_requires(), + extras_require={ + 'elasticsearch': ['elasticsearch-dsl>=7.0.0,<8.0.0'], + 'influxdb': ['influxdb>=5.2,<5.3'], + }, classifiers=[ 'Development Status :: 3 - Alpha', 'Environment :: Web Environment', @@ -64,7 +68,7 @@ def get_install_requires(): 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)', 'Operating System :: OS Independent', 'Framework :: Django', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', ], ) diff --git a/tests/openwisp2/settings.py b/tests/openwisp2/settings.py index e8ab72828..6bba45185 100644 --- a/tests/openwisp2/settings.py +++ b/tests/openwisp2/settings.py @@ -19,7 +19,7 @@ } } -TIMESERIES_DATABASE = { +INFLUXDB_SETTINGS = { 'BACKEND': 'openwisp_monitoring.db.backends.influxdb', 'USER': 'openwisp', 'PASSWORD': 'openwisp', @@ -28,6 +28,20 @@ 'PORT': '8086', } +ELASTICSEARCH_SETTINGS = { + 'BACKEND': 'openwisp_monitoring.db.backends.elasticsearch', + 'USER': 'openwisp', + 'PASSWORD': 'openwisp', + 'NAME': 'openwisp2', + 'HOST': os.getenv('ELASTICSEARCH_HOST', 'localhost'), + 'PORT': '9200', +} + +if os.environ.get('elasticsearch', False): + TIMESERIES_DATABASE = ELASTICSEARCH_SETTINGS +else: + TIMESERIES_DATABASE = INFLUXDB_SETTINGS + SECRET_KEY = 'fn)t*+$)ugeyip6-#txyy$5wf2ervc0d2n#h)qb)y5@ly$t*@w' INSTALLED_APPS = [