diff --git a/.travis.yml b/.travis.yml index 302077a76..1b3602049 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,7 +24,6 @@ addons: services: - docker - - redis-server branches: only: @@ -32,19 +31,23 @@ branches: - dev before_install: - - docker run -d --name influxdb -e INFLUXDB_DB=openwisp2 -p 8086:8086 influxdb:alpine + - docker-compose up -d - pip install -U pip wheel setuptools - pip install $DJANGO - pip install -U -r requirements-test.txt install: - - pip install -e . + - pip install -e .[influxdb,elasticsearch] - sh install-dev.sh script: - ./run-qa-checks - - SAMPLE_APP=1 coverage run --source=openwisp_monitoring runtests.py - - coverage run -a --source=openwisp_monitoring runtests.py + - SAMPLE_APP=1 coverage run -p --source=openwisp_monitoring runtests.py + - coverage run -p --source=openwisp_monitoring runtests.py + - elasticsearch=1 SAMPLE_APP=1 coverage run -p --source=openwisp_monitoring runtests.py + - elasticsearch=1 coverage run -p --source=openwisp_monitoring runtests.py + - coverage combine + - coverage report -m jobs: include: diff --git a/Dockerfile b/Dockerfile index 25fc30a61..f6f0fff1c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,6 +19,7 @@ WORKDIR /opt/openwisp/tests/ ENV NAME=openwisp-monitoring \ PYTHONBUFFERED=1 \ INFLUXDB_HOST=influxdb \ - REDIS_HOST=redis + REDIS_HOST=redis \ + ELASTICSEARCH_HOST=es01 CMD ["sh", "docker-entrypoint.sh"] EXPOSE 8000 diff --git a/README.rst b/README.rst index 12ffa1dfa..b996e69b5 100644 --- a/README.rst +++ b/README.rst @@ -28,7 +28,7 @@ Available Features * Collects and displays `device status <#device-status>`_ information like uptime, RAM status, CPU load averages, Interface properties and addresses, WiFi interface status and associated clients, Neighbors information, DHCP Leases, Disk/Flash status -* Collection of monitoring information in a timeseries database (currently only influxdb is supported) +* Collection of monitoring information in a timeseries database (`InfluxDB `_ and `Elasticsearch `_ are currently supported) * Monitoring charts for uptime, packet loss, round trip time (latency), associated wifi clients, interface traffic, RAM usage, CPU load, flash/disk usage * Charts can be viewed at resolutions of 1 day, 3 days, a week, a month and a year @@ -50,6 +50,8 @@ beforehand. In case you prefer not to use Docker you can `install InfluxDB `_ and Redis from your repositories, but keep in mind that the version packaged by your distribution may be different. +If you wish to use ``Elasticsearch`` for storing and retrieving timeseries data then `install Elasticsearch `_. + Install spatialite and sqlite: .. code-block:: shell @@ -110,6 +112,19 @@ Follow the setup instructions of `openwisp-controller 'PORT': '8086', } +In case, you wish to use ``Elasticsearch`` for timeseries data storage and retrieval, +make use of the following settings + +.. code-block:: python + TIMESERIES_DATABASE = { + 'BACKEND': 'openwisp_monitoring.db.backends.elasticsearch', + 'USER': 'openwisp', + 'PASSWORD': 'openwisp', + 'NAME': 'openwisp2', + 'HOST': 'localhost', + 'PORT': '9200', + } + ``urls.py``: .. code-block:: python @@ -422,6 +437,9 @@ This data is only used to assess the recent status of devices, keeping it for a long time would not add much benefit and would cost a lot more in terms of disk space. +**Note**: In case you use ``Elasticsearch`` then time shall be taken as integral multiple of a day. +That means the time ``36h0m0s`` shall be interpreted as ``24h0m0s`` (integral multiple of a day). + ``OPENWISP_MONITORING_AUTO_PING`` ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -578,11 +596,21 @@ MB (megabytes) instead of GB (Gigabytes) you can use: "SUM(rx_bytes) / 1000000 AS download FROM {key} " "WHERE time >= '{time}' AND content_type = '{content_type}' " "AND object_id = '{object_id}' GROUP BY time(1d)" - ) + ), + 'elasticsearch': _make_query({ + 'upload': {'sum': {'field': 'points.fields.tx_bytes'}}, + 'download': {'avg': {'field': 'points.fields.rx_bytes'}}, + }) }, } } + # Please declare the operations separately in case you use elasticsearch as done below + OPENWISP_MONITORING_ADDITIONAL_CHART_OPERATIONS = { + 'upload': {'operator': '/', 'value': 1000000}, + 'download': {'operator': '/', 'value': 1000000}, + } + Or if you want to define a new chart configuration, which you can then call in your custom code (eg: a custom check class), you can do so as follows: @@ -590,6 +618,8 @@ call in your custom code (eg: a custom check class), you can do so as follows: from django.utils.translation import gettext_lazy as _ + from openwisp_monitoring.db.backends.elasticsearch import _make_query + OPENWISP_MONITORING_CHARTS = { 'ram': { 'type': 'line', @@ -603,7 +633,12 @@ call in your custom code (eg: a custom check class), you can do so as follows: "MEAN(buffered) AS buffered FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}' " "GROUP BY time(1d)" - ) + ), + 'elasticsearch': _make_query({ + 'total': {'avg': {'field': 'points.fields.total'}}, + 'free': {'avg': {'field': 'points.fields.free'}}, + 'buffered': {'avg': {'field': 'points.fields.buffered'}}, + }) }, } } diff --git a/docker-compose.yml b/docker-compose.yml index 6386355ed..795fed19a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,6 +11,8 @@ services: depends_on: - influxdb - redis + - es01 + - es02 influxdb: image: influxdb:1.8-alpine @@ -22,6 +24,45 @@ services: INFLUXDB_DB: openwisp2 INFLUXDB_USER: openwisp INFLUXDB_USER_PASSWORD: openwisp + # clustered version of elasticsearch is used as that might be used in production + es01: + image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0 + container_name: es01 + environment: + - "node.name=es01" + - "discovery.seed_hosts=es02" + - "cluster.initial_master_nodes=es01,es02" + - "cluster.name=openwisp2" + - "bootstrap.memory_lock=true" + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - esdata01:/usr/share/elasticsearch/data + ports: + - 9200:9200 + networks: + - esnet + es02: + image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0 + container_name: es02 + environment: + - "node.name=es02" + - "discovery.seed_hosts=es01" + - "cluster.initial_master_nodes=es01,es02" + - "cluster.name=openwisp2" + - "bootstrap.memory_lock=true" + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - esdata02:/usr/share/elasticsearch/data + networks: + - esnet redis: image: redis:5.0-alpine @@ -31,3 +72,10 @@ services: volumes: influxdb-data: {} + esdata01: + driver: local + esdata02: + driver: local + +networks: + esnet: diff --git a/openwisp_monitoring/db/__init__.py b/openwisp_monitoring/db/__init__.py index 063d2d8f7..64510ebc5 100644 --- a/openwisp_monitoring/db/__init__.py +++ b/openwisp_monitoring/db/__init__.py @@ -1,7 +1,5 @@ from .backends import timeseries_db chart_query = timeseries_db.queries.chart_query -default_chart_query = timeseries_db.queries.default_chart_query -device_data_query = timeseries_db.queries.device_data_query -__all__ = ['timeseries_db', 'chart_query', 'default_chart_query', 'device_data_query'] +__all__ = ['timeseries_db', 'chart_query'] diff --git a/openwisp_monitoring/db/backends/__init__.py b/openwisp_monitoring/db/backends/__init__.py index ae4847acd..142a69110 100644 --- a/openwisp_monitoring/db/backends/__init__.py +++ b/openwisp_monitoring/db/backends/__init__.py @@ -48,7 +48,8 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None): except ImportError as e: # The database backend wasn't found. Display a helpful error message # listing all built-in database backends. - builtin_backends = ['influxdb'] + builtin_backends = ['influxdb', 'elasticsearch'] + raise e if backend_name not in [ f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends ]: diff --git a/openwisp_monitoring/db/backends/elasticsearch/__init__.py b/openwisp_monitoring/db/backends/elasticsearch/__init__.py new file mode 100644 index 000000000..5eb061d1b --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/__init__.py @@ -0,0 +1,3 @@ +from .queries import _make_query + +__all__ = ['_make_query'] diff --git a/openwisp_monitoring/db/backends/elasticsearch/client.py b/openwisp_monitoring/db/backends/elasticsearch/client.py new file mode 100644 index 000000000..d5541e2e0 --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/client.py @@ -0,0 +1,461 @@ +import json +import logging +import re +from collections import Counter +from copy import deepcopy +from datetime import datetime, timedelta + +from django.conf import settings +from django.core.exceptions import ValidationError +from django.utils.functional import cached_property +from elasticsearch import Elasticsearch +from elasticsearch.exceptions import ElasticsearchException, NotFoundError +from elasticsearch_dsl import Search +from elasticsearch_dsl.connections import connections +from pytz import timezone as tz + +from openwisp_utils.utils import deep_merge_dicts + +from .. import TIMESERIES_DB +from .index import MetricDocument, Point, find_metric +from .queries import default_chart_query, math_map, operator_lookup +from .retention_policies import _make_policy, default_rp_policy + +logger = logging.getLogger(__name__) + + +class DatabaseClient(object): + _AGGREGATE = [ + 'filters', + 'children', + 'parent', + 'date_histogram', + 'auto_date_histogram', + 'date_range', + 'geo_distance', + 'geohash_grid', + 'geotile_grid', + 'global', + 'geo_centroid', + 'global', + 'ip_range', + 'missing', + 'nested', + 'range', + 'reverse_nested', + 'significant_terms', + 'significant_text', + 'sampler', + 'terms', + 'diversified_sampler', + 'composite', + 'top_hits', + 'avg', + 'weighted_avg', + 'cardinality', + 'extended_stats', + 'geo_bounds', + 'max', + 'min', + 'percentiles', + 'percentile_ranks', + 'scripted_metric', + 'stats', + 'sum', + 'value_count', + ] + backend_name = 'elasticsearch' + + def __init__(self, db_name='metric'): + self.db_name = db_name or TIMESERIES_DB['NAME'] + self.client_error = ElasticsearchException + + def create_database(self): + """ creates connection to elasticsearch """ + connections.create_connection( + hosts=[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"] + ) + db = self.get_db + # Skip if support for Index Lifecycle Management is disabled or no privileges + self.ilm_enabled = db.ilm.start()['acknowledged'] + self.create_or_alter_retention_policy(name='default') + + def drop_database(self): + """ deletes all indices """ + logger.debug('Deleted all indices data from Elasticsearch') + + @cached_property + def get_db(self): + """ Returns an ``Elasticsearch Client`` instance """ + return Elasticsearch( + hosts=[{'host': TIMESERIES_DB['HOST'], 'port': TIMESERIES_DB['PORT']}], + http_auth=(TIMESERIES_DB['USER'], TIMESERIES_DB['PASSWORD']), + retry_on_timeout=True, + # sniff before doing anything + sniff_on_start=True, + # refresh nodes after a node fails to respond + sniff_on_connection_fail=True, + ) + + def create_or_alter_retention_policy(self, name, duration=None): + """ + creates or alters existing retention policy if necessary + + Note: default retention policy can't be altered with this function + """ + if not self.ilm_enabled: + return + ilm = self.get_db.ilm + if not duration: + try: + ilm.get_lifecycle(policy='default') + except NotFoundError: + ilm.put_lifecycle(policy='default', body=default_rp_policy) + return + days = f'{int(duration.split("h")[0]) // 24}d' + duration_changed = False + try: + policy = ilm.get_lifecycle(policy=name) + exists = True + current_duration = policy[name]['policy']['phases']['hot']['actions'][ + 'rollover' + ]['max_age'] + duration_changed = current_duration != days + except NotFoundError: + exists = False + if not exists or duration_changed: + policy = _make_policy(days) + ilm.put_lifecycle(policy=name, body=policy) + + def get_list_retention_policies(self): + if not self.ilm_enabled: + return + return self.get_db.ilm.get_lifecycle() + + def query(self, query, key=None, **kwargs): + try: + response = ( + Search(using=self.get_db, index=key) + .update_from_dict(query) + .execute() + .to_dict() + ) + if not response['hits']['total']['value']: + return {} + return response + except NotFoundError: + return {} + + def write(self, name, values, **kwargs): + rp = kwargs.get('retention_policy') + tags = kwargs.get('tags') + timestamp = kwargs.get('timestamp') + metric_id, index = find_metric(self.get_db, name, tags, rp, add=True) + metric_index = MetricDocument.get(metric_id, index=index, using=self.get_db) + point = Point(fields=values, time=timestamp or datetime.now()) + metric_index.points.append(point) + metric_index.save() + # Elasticsearch automatically refreshes indices every second + # but we can't wait that long especially for tests + self.get_db.indices.refresh(index=name) + + def read(self, key, fields, tags=None, limit=1, order='time', **kwargs): + """ ``since`` should be of the form 'now() - s' """ + extra_fields = kwargs.get('extra_fields') + time_format = kwargs.get('time_format') + since = kwargs.get('since') + try: + metric_id, index = find_metric(self.get_db, key, tags) + except TypeError: + return [] + metric_index = MetricDocument.get(index=index, id=metric_id, using=self.get_db) + # distinguish between traffic and clients + points = [] + for point in list(metric_index.points): + if fields in point.fields.to_dict(): + points.append(point) + if order == 'time': + points = points[0:limit] + elif order == '-time': + points = list(reversed(points))[0:limit] + else: + raise self.client_error( + f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get ' + 'result sorted in ascending /descending order respectively.' + ) + if extra_fields and extra_fields != '*': + assert isinstance(extra_fields, list) + for count, point in enumerate(points): + fields_dict = point.to_dict()['fields'] + point = { + 'time': self._format_time(point['time'], time_format), + fields: fields_dict[fields], + } + for extra_field in extra_fields: + if fields_dict.get(extra_field) is not None: + point.update({extra_field: fields_dict[extra_field]}) + points[count] = point + elif extra_fields == '*': + for count, point in enumerate(points): + points[count] = deep_merge_dicts( + point.fields.to_dict(), + {'time': self._format_time(point.time, time_format)}, + ) + else: + points = [ + deep_merge_dicts( + {fields: p.fields.to_dict()[fields]}, + {'time': self._format_time(p.time, time_format)}, + ) + for p in points + ] + if since: + since = datetime.now().timestamp() - int(re.search(r'\d+', since).group()) + points = [point for point in points if point['time'] >= since] + return points + + def _format_time(self, obj, time_format=None): + """ returns datetime object in isoformat / unix timestamp and UTC timezone """ + if time_format == 'isoformat': + return obj.astimezone(tz=tz('UTC')).isoformat(timespec='seconds') + return int(obj.astimezone(tz=tz('UTC')).timestamp()) + + def get_list_query(self, query, precision='s', key=None): + response = self.query(query, key) + try: + points = response['aggregations']['GroupByTime']['set_range']['time'][ + 'buckets' + ] + list_points = self._fill_points( + query, [self._format(point, precision) for point in points], + ) + list_points.reverse() + except KeyError: + return [] + return list_points + + def _fill_points(self, query, points): + _range = query['aggs']['GroupByTime']['nested']['aggs']['set_range'] + # if not _range or not points: + # return points + days = int(_range['filter']['range']['points.time']['from'][4:-3]) + # return if summary query + try: + interval = _range['aggs']['time']['date_histogram']['fixed_interval'] + except KeyError: + return points + # return if top_fields query + if interval == '365d': + return points + interval_dict = {'10m': 600, '20m': 1200, '1h': 3600, '24h': 86400} + interval = interval_dict[interval] + start_time = datetime.now() + end_time = start_time - timedelta(days=days) # include today + dummy_point = deepcopy(points[0]) + start_ts = points[0]['time'] + interval + end_ts = points[-1]['time'] - interval + for field in dummy_point.keys(): + dummy_point[field] = None if field != 'wifi_clients' else 0 + while start_ts < start_time.timestamp(): + dummy_point['time'] = start_ts + points.insert(0, deepcopy(dummy_point)) + start_ts += interval + # TODO: This is required due to time_zone issues + while points[-1]['time'] < end_time.timestamp(): + points.pop(-1) + while end_ts > end_time.timestamp(): + dummy_point['time'] = end_ts + points.append(deepcopy(dummy_point)) + end_ts -= interval + return points + + def delete_metric_data(self, key=None, tags=None): + """ + deletes a specific metric based on given key and tags; + deletes all metrics if neither provided + """ + if key and tags: + metric_id, index = find_metric(self.get_db, key, tags) + self.get_db.delete(index=index, id=metric_id) + elif key: + self.get_db.indices.delete_alias( + index=f'{key}-*', name=key, ignore=[400, 404] + ) + else: + self.get_db.indices.delete(index='*', ignore=[400, 404]) + # TODO: Speed up tests by retaining indices, almost 50s difference on travis + # try: + # self.get_db.delete_by_query(index='_all', body={'query': {'match_all': {}}}) + # except self.client_error: + # pass + + # Chart related functions below + + def validate_query(self, query): + if not isinstance(query, dict): + raise ValidationError( + {'configuration': f'error parsing query: found {query}'} + ) + # Elasticsearch currently supports validation of only query section, + # aggs, size, _source etc. are not supported + try: + valid_check = self.get_db.indices.validate_query( + body={'query': query['query']}, explain=True + ) + except NotFoundError: + return True + # Show a helpful message for failure + if not valid_check['valid']: + raise ValidationError(valid_check['error']) + return self._is_aggregate(query) + + def _is_aggregate(self, query): + agg_dict = query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs'][ + 'time' + ]['aggs']['nest']['nested']['aggs'].values() + agg = [] + for item in agg_dict: + agg.append(next(iter(item))) + is_aggregate = True if set(agg) <= set(self._AGGREGATE) else False + return is_aggregate if 'size' in query else False + + def get_query( + self, + chart_type, + params, + time, + group_map, + summary=False, + fields=None, + query=None, + timezone=settings.TIME_ZONE, + ): + if not params.get('object_id'): + del query['query'] + query = json.dumps(query) + for k, v in params.items(): + query = query.replace('{' + k + '}', v) + query = self._group_by(query, time, chart_type, group_map, strip=summary) + set_range = query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs'][ + 'time' + ] + if fields: + aggregate_dict = set_range['aggs']['nest']['nested']['aggs'] + agg = deepcopy(aggregate_dict).popitem()[1].popitem()[0] + aggregate_dict.update( + { + f'{field}': {agg: {'field': f'points.fields.{field}'}} + for field in fields + } + ) + try: + set_range['date_histogram']['time_zone'] = timezone + except KeyError: + pass + return query + + def _group_by(self, query, time, chart_type, group_map, strip=False): + query = query.replace('1d/d', f'{time}/d') + if not strip and not chart_type == 'histogram': + value = group_map[time] + query = query.replace('10m', value) + if strip: + query = json.loads(query) + _range = query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs'] + _range['time'].pop('date_histogram') + _range['time']['auto_date_histogram'] = { + 'field': 'points.time', + 'format': 'date_time_no_millis', + 'buckets': 1, + } + if isinstance(query, str): + query = json.loads(query) + return query + + def _get_top_fields( + self, + params, + chart_type, + group_map, + number, + query=None, + timezone=settings.TIME_ZONE, + get_fields=True, + **kwargs, + ): + """ + Returns top fields if ``get_fields`` set to ``True`` (default) + else it returns points containing the top fields. + """ + try: + response = self.get_db.indices.get_mapping(index=params['key']) + except NotFoundError: + return [] + fields = [ + k + for k, v in list(response.values())[0]['mappings']['properties']['points'][ + 'properties' + ]['fields']['properties'].items() + ] + query = self.get_query( + chart_type=chart_type, + params=params, + time='365d', + group_map=group_map, + summary=True, + fields=fields, + query=query, + timezone=timezone, + ) + point = self.get_list_query(query, key=params['key'])[0] + time = point.pop('time') + point = Counter(point).most_common(number) + if get_fields: + return [k for k, v in point] + points = [{'time': time}] + for k, v in point: + points[0][k] = v + return points + + def _format(self, point, precision='s'): + """ allowed values for precision are ``s`` and ``ms`` """ + pt = {} + # By default values returned are in millisecond precision + if precision == 'ms': + pt['time'] = point['key'] / 1000 + else: + pt['time'] = point['key'] // 1000 + for key, value in point.items(): + if isinstance(value, dict): + for k, v in value.items(): + if isinstance(v, dict): + pt[k] = self._transform_field(k, v['value']) + return pt + + def _transform_field(self, field, value): + """ Performs arithmetic operations on the field if required """ + if value is None: + return value + if field in math_map: + op = operator_lookup.get(math_map[field]['operator']) + if op is not None: + value = op(value, math_map[field]['value']) + return value + + def default_chart_query(self, tags=False): + q = deepcopy(default_chart_query) + # This is used to distinguish that it's default query + del q['size'] + if not tags: + q['query']['nested']['query']['bool']['must'] = [] + return q + + def _device_data(self, key, tags, fields, **kwargs): + """ returns last snapshot of ``device_data`` """ + return self.read( + key=key, fields=fields, tags=tags, order='-time', time_format='isoformat', + ) + + +# TODO: +# _fill_points has a while which shouldn't be required diff --git a/openwisp_monitoring/db/backends/elasticsearch/index.py b/openwisp_monitoring/db/backends/elasticsearch/index.py new file mode 100644 index 000000000..ae6b6aad4 --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/index.py @@ -0,0 +1,92 @@ +import uuid + +from django.conf import settings +from elasticsearch.exceptions import NotFoundError +from elasticsearch_dsl import Date, Document, InnerDoc, Nested, Q, Search + + +class Point(InnerDoc): + time = Date(required=True, default_timezone=settings.TIME_ZONE) + fields = Nested(dynamic=True, required=True, multi=True) + + +class MetricDocument(Document): + tags = Nested(dynamic=True, required=False, multi=True) + points = Nested(Point) + + class Index: + name = 'metric' + settings = { + 'number_of_shards': 1, + 'number_of_replicas': 0, + 'lifecycle.name': 'default', + 'lifecycle.rollover_alias': 'metric', + } + + +def find_metric(client, index, tags, retention_policy=None, add=False): + search = Search(using=client, index=index) + if tags: + tags_dict = dict() + for key, value in tags.items(): + tags_dict[f'tags.{key}'] = value + q = Q( + 'nested', + path='tags', + query=Q( + 'bool', must=[Q('match', **{k: str(v)}) for k, v in tags_dict.items()] + ), + ) + else: + q = Q() + try: + result = list(search.query(q).execute())[0].meta + return result['id'], result['index'] + except (NotFoundError, AttributeError, IndexError): + if add: + document = create_document( + client, index, tags, retention_policy=retention_policy + ) + return document['_id'], document['_index'] + return None + + +def create_document(client, key, tags, _id=None, retention_policy=None): + """ + Adds document to relevant index using ``keys``, ``tags`` and ``id`` provided. + If no ``id`` is provided a random ``uuid`` would be used. + """ + _id = str(_id or uuid.uuid1()) + # If index exists, create the document and return + try: + index_aliases = client.indices.get_alias(index=key) + for k, v in index_aliases.items(): + if v['aliases'][key]['is_write_index']: + break + client.create(index=k, id=_id, body={'tags': tags}) + return {'_id': _id, '_index': k} + except NotFoundError: + pass + # Create a new index if it doesn't exist + name = f'{key}-000001' + document = MetricDocument(meta={'id': _id}) + document._index = document._index.clone(name) + # Create a new index template if it doesn't exist + if not client.indices.exists_template(name=key): + document._index.settings(**{'lifecycle.rollover_alias': key}) + if retention_policy: + document._index.settings(**{'lifecycle.name': retention_policy}) + # add index pattern is added for Index Lifecycle Management + document._index.as_template(key, f'{key}-*').save(using=client) + document.init(using=client, index=name) + document.meta.index = name + document.tags = tags + document.save(using=client, index=name) + client.indices.put_alias(index=name, name=key, body={'is_write_index': True}) + if retention_policy: + client.indices.put_settings( + body={'lifecycle.name': retention_policy}, index=name + ) + client.indices.put_settings(body={'lifecycle.rollover_alias': key}, index=name) + client.indices.refresh(index=key) + return document.to_dict(include_meta=True) diff --git a/openwisp_monitoring/db/backends/elasticsearch/queries.py b/openwisp_monitoring/db/backends/elasticsearch/queries.py new file mode 100644 index 000000000..bdae617eb --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/queries.py @@ -0,0 +1,130 @@ +import operator +from copy import deepcopy + +from openwisp_utils.utils import deep_merge_dicts + +from .settings import ADDITIONAL_CHART_OPERATIONS + +default_chart_query = { + 'query': { + 'nested': { + 'path': 'tags', + 'query': { + 'bool': { + 'must': [ + {'match': {'tags.object_id': {'query': '{object_id}'}}}, + {'match': {'tags.content_type': {'query': '{content_type}'}}}, + ] + } + }, + }, + }, + '_source': False, + 'size': 0, + 'aggs': { + 'GroupByTime': { + 'nested': { + 'path': 'points', + 'aggs': { + 'set_range': { + 'filter': { + 'range': { + 'points.time': {'from': 'now-1d/d', 'to': 'now/d'} + } + }, + 'aggs': { + 'time': { + 'date_histogram': { + 'field': 'points.time', + 'fixed_interval': '10m', + 'format': 'date_time_no_millis', + 'order': {'_key': 'desc'}, + }, + 'aggs': { + 'nest': { + 'nested': { + 'path': 'points.fields', + 'aggs': { + '{field_name}': { + 'avg': { + 'field': 'points.fields.{field_name}' + } + } + }, + } + }, + }, + }, + }, + } + }, + } + } + }, +} + +math_map = { + 'uptime': {'operator': '*', 'value': 100}, + 'memory_usage': {'operator': '*', 'value': 100}, + 'CPU_load': {'operator': '*', 'value': 100}, + 'disk_usage': {'operator': '*', 'value': 100}, + 'upload': {'operator': '/', 'value': 1000000000}, + 'download': {'operator': '/', 'value': 1000000000}, +} + +operator_lookup = { + '+': operator.add, + '-': operator.sub, + '*': operator.mul, + '/': operator.truediv, +} + +if ADDITIONAL_CHART_OPERATIONS: + assert isinstance(ADDITIONAL_CHART_OPERATIONS, dict) + for value in ADDITIONAL_CHART_OPERATIONS.values(): + assert value['operator'] in operator_lookup + assert isinstance(value['value'], (int, float)) + math_map = deep_merge_dicts(math_map, ADDITIONAL_CHART_OPERATIONS) + + +def _make_query(aggregation=None): + query = deepcopy(default_chart_query) + if aggregation: + query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs']['time'][ + 'aggs' + ]['nest']['nested']['aggs'] = aggregation + return query + + +def _get_chart_query(): + aggregation_dict = { + 'uptime': {'uptime': {'avg': {'field': 'points.fields.reachable'}}}, + 'packet_loss': {'packet_loss': {'avg': {'field': 'points.fields.loss'}}}, + 'rtt': { + 'RTT_average': {'avg': {'field': 'points.fields.rtt_avg'}}, + 'RTT_max': {'avg': {'field': 'points.fields.rtt_max'}}, + 'RTT_min': {'avg': {'field': 'points.fields.rtt_min'}}, + }, + 'traffic': { + 'upload': {'sum': {'field': 'points.fields.tx_bytes'}}, + 'download': {'sum': {'field': 'points.fields.rx_bytes'}}, + }, + 'wifi_clients': { + 'wifi_clients': { + 'cardinality': { + 'field': 'points.fields.{field_name}.keyword', + 'missing': 0, + } + } + }, + 'memory': {'memory_usage': {'avg': {'field': 'points.fields.percent_used'}}}, + 'cpu': {'CPU_load': {'avg': {'field': 'points.fields.cpu_usage'}}}, + 'disk': {'disk_usage': {'avg': {'field': 'points.fields.used_disk'}}}, + } + query = {} + for key, value in aggregation_dict.items(): + query[key] = {'elasticsearch': _make_query(value)} + return query + + +chart_query = _get_chart_query() diff --git a/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py b/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py new file mode 100644 index 000000000..d4269959e --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py @@ -0,0 +1,41 @@ +# By default age is calculated from the date the index is created but if the +# index has been rolled over than the rollover date is used to calculate the age + +default_rp_policy = { + 'policy': { + 'phases': { + 'hot': { + 'actions': { + 'rollover': {'max_age': '30d', 'max_size': '90G'}, + 'set_priority': {'priority': 100}, + } + }, + 'warm': { + 'min_age': '30d', + 'actions': { + 'forcemerge': {'max_num_segments': 1}, + 'allocate': {'number_of_replicas': 0}, + 'set_priority': {'priority': 50}, + }, + }, + 'cold': {'min_age': '150d', 'actions': {'freeze': {}}}, + 'delete': {'min_age': '335d', 'actions': {'delete': {}}}, + } + } +} + + +def _make_policy(max_age): + return { + 'policy': { + 'phases': { + 'hot': { + 'actions': { + 'rollover': {'max_age': max_age}, + 'set_priority': {'priority': 100}, + } + }, + 'delete': {'actions': {'delete': {}}}, + } + } + } diff --git a/openwisp_monitoring/db/backends/elasticsearch/settings.py b/openwisp_monitoring/db/backends/elasticsearch/settings.py new file mode 100644 index 000000000..8bb6f1159 --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/settings.py @@ -0,0 +1,5 @@ +from django.conf import settings + +ADDITIONAL_CHART_OPERATIONS = getattr( + settings, 'OPENWISP_MONITORING_ADDITIONAL_CHART_OPERATIONS', {} +) diff --git a/openwisp_monitoring/db/utils.py b/openwisp_monitoring/db/backends/elasticsearch/tests/__init__.py similarity index 100% rename from openwisp_monitoring/db/utils.py rename to openwisp_monitoring/db/backends/elasticsearch/tests/__init__.py diff --git a/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py b/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py new file mode 100644 index 000000000..42c65ea52 --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py @@ -0,0 +1,254 @@ +from datetime import timedelta +from importlib import reload +from unittest.mock import patch + +from django.conf import settings +from django.core.exceptions import ValidationError +from django.utils.timezone import now +from openwisp_monitoring.device.settings import SHORT_RETENTION_POLICY +from openwisp_monitoring.device.tests import DeviceMonitoringTestCase +from openwisp_monitoring.device.utils import SHORT_RP, manage_short_retention_policy + +from ... import timeseries_db +from .. import queries as queries_module + + +class TestDatabaseClient(DeviceMonitoringTestCase): + def test_get_query_fields_function(self): + c = self._create_chart(test_data=None, configuration='histogram') + q = c.get_query(fields=['ssh', 'http2', 'apple-music']) + self.assertIn("'ssh': {'sum': {'field': 'points.fields.ssh'}}", str(q)) + self.assertIn("'http2': {'sum': {'field': 'points.fields.http2'}}", str(q)) + self.assertIn( + "'apple-music': {'sum': {'field': 'points.fields.apple-music'}}", str(q) + ) + + def test_default_query(self): + c = self._create_chart(test_data=False) + q = timeseries_db.default_chart_query(tags=True) + self.assertEqual(c.query, q) + + def test_write(self): + timeseries_db.write('test_write', dict(value=2)) + measurement = timeseries_db.read(key='test_write', fields='value')[0] + self.assertEqual(measurement['value'], 2) + + def test_general_write(self): + m = self._create_general_metric(name='Sync test') + m.write(1) + measurement = timeseries_db.read(key='sync_test', fields='value')[0] + self.assertEqual(measurement['value'], 1) + + def test_object_write(self): + om = self._create_object_metric() + om.write(3) + measurement = timeseries_db.read( + key='test_metric', fields='value', tags=om.tags + )[0] + self.assertEqual(measurement['value'], 3) + + def test_general_same_key_different_fields(self): + down = self._create_general_metric( + name='traffic (download)', key='traffic', field_name='download' + ) + down.write(200) + up = self._create_general_metric( + name='traffic (upload)', key='traffic', field_name='upload' + ) + up.write(100) + measurement = timeseries_db.read(key='traffic', fields='download')[0] + self.assertEqual(measurement['download'], 200) + measurement = timeseries_db.read(key='traffic', fields='upload')[0] + self.assertEqual(measurement['upload'], 100) + + def test_object_same_key_different_fields(self): + user = self._create_user() + user_down = self._create_object_metric( + name='traffic (download)', + key='traffic', + field_name='download', + content_object=user, + ) + user_down.write(200) + user_up = self._create_object_metric( + name='traffic (upload)', + key='traffic', + field_name='upload', + content_object=user, + ) + user_up.write(100) + measurement = timeseries_db.read( + key='traffic', fields='download', tags=user_down.tags + )[0] + self.assertEqual(measurement['download'], 200) + measurement = timeseries_db.read( + key='traffic', fields='upload', tags=user_up.tags + )[0] + self.assertEqual(measurement['upload'], 100) + + def test_get_query_1d(self): + c = self._create_chart(test_data=None, configuration='uptime') + q = c.get_query(time='1d') + time_map = c.GROUP_MAP['1d'] + self.assertIn( + "{'range': {'points.time': {'from': 'now-1d/d', 'to': 'now/d'}}}", str(q) + ) + self.assertIn(f"'fixed_interval': '{time_map}'", str(q)) + + def test_get_query_30d(self): + c = self._create_chart(test_data=None, configuration='uptime') + q = c.get_query(time='30d') + time_map = c.GROUP_MAP['30d'] + self.assertIn( + "{'range': {'points.time': {'from': 'now-30d/d', 'to': 'now/d'}}}", str(q) + ) + self.assertIn(f"'fixed_interval': '{time_map}'", str(q)) + + def test_retention_policy(self): + manage_short_retention_policy() + rp = timeseries_db.get_list_retention_policies() + assert 'default' in rp + assert SHORT_RP in rp + days = f'{int(SHORT_RETENTION_POLICY.split("h")[0]) // 24}d' + self.assertEqual( + rp['short']['policy']['phases']['hot']['actions']['rollover']['max_age'], + days, + ) + + def test_get_query(self): + c = self._create_chart(test_data=False) + m = c.metric + params = dict( + field_name=m.field_name, + key=m.key, + content_type=m.content_type_key, + object_id=m.object_id, + time=c.DEFAULT_TIME, + ) + expected = timeseries_db.get_query( + c.type, + params, + c.DEFAULT_TIME, + c.GROUP_MAP, + query=c.query, + timezone=settings.TIME_ZONE, + ) + self.assertEqual(c.get_query(), expected) + + def test_query_no_index(self): + timeseries_db.delete_metric_data(key='ping') + c = self._create_chart(test_data=False) + q = c.get_query() + self.assertEqual(timeseries_db.query(q, index='ping'), {}) + self.assertEqual(timeseries_db.get_list_query(q), []) + + def test_1d_chart_data(self): + c = self._create_chart() + data = c.read(time='1d') + self.assertIn('x', data) + self.assertEqual(len(data['x']), 144) + self.assertIn('traces', data) + self.assertEqual(9.0, data['traces'][0][1][-1]) + # Test chart with old data has same length + m = self._create_general_metric(name='dummy') + c = self._create_chart(metric=m, test_data=False) + m.write(6.0, time=now() - timedelta(hours=23)) + data = c.read(time='1d') + self.assertIn('x', data) + self.assertEqual(len(data['x']), 144) + self.assertIn('traces', data) + self.assertIn(6.0, data['traces'][0][1]) + + def test_delete_metric_data(self): + obj = self._create_user() + om = self._create_object_metric(name='Logins', content_object=obj) + om.write(100) + self.assertEqual(om.read()[0]['value'], 100) + timeseries_db.delete_metric_data(key=om.key, tags=om.tags) + + def test_invalid_query(self): + q = timeseries_db.default_chart_query() + q['query']['nested']['query']['must'] = 'invalid' + try: + timeseries_db.validate_query(q) + except ValidationError as e: + self.assertIn('ParsingException: [bool] malformed query', str(e)) + + def test_non_aggregation_query(self): + q = {'query': timeseries_db.default_chart_query()['query']} + self.assertEqual(timeseries_db.get_list_query(q), []) + + def test_timestamp_precision(self): + c = self._create_chart() + points = timeseries_db.get_list_query(c.get_query(), precision='ms') + self.assertIsInstance(points[0]['time'], float) + points = timeseries_db.get_list_query(c.get_query(), precision='s') + self.assertIsInstance(points[0]['time'], int) + + def create_docs_single_index(self): + m = self._create_object_metric(name='dummy') + m.write(1) + d = self._create_device(organization=self._create_org()) + m2 = self._create_object_metric(name='dummy', content_object=d) + m2.write(1) + self.assertEqual(len(timeseries_db.get_db.indices.get_alias(name='dummy')), 1) + + def test_additional_chart_operations_setting(self): + modify_operators = { + 'upload': {'operator': '/', 'value': 1000000}, + 'download': {'operator': '/', 'value': 1000000}, + } + path = 'openwisp_monitoring.db.backends.elasticsearch.queries.ADDITIONAL_CHART_OPERATIONS' + with patch.dict(path, modify_operators, clear=True): + queries = reload(queries_module) + self.assertEqual(queries.ADDITIONAL_CHART_OPERATIONS, modify_operators) + self.assertEqual(queries.math_map['upload'], modify_operators['upload']) + self.assertEqual(queries.math_map['download'], modify_operators['download']) + + def test_read(self): + c = self._create_chart() + data = c.read() + key = c.metric.field_name + self.assertIn('x', data) + self.assertIn('traces', data) + self.assertEqual(len(data['x']), 168) + charts = data['traces'] + self.assertEqual(charts[0][0], key) + self.assertEqual(len(charts[0][1]), 168) + self.assertTrue(all(elem in charts[0][1] for elem in [3, 6, 9])) + + def test_read_multiple(self): + c = self._create_chart(test_data=None, configuration='multiple_test') + m1 = c.metric + m2 = self._create_object_metric( + name='test metric 2', + key='test_metric', + field_name='value2', + content_object=m1.content_object, + ) + now_ = now() + for n in range(0, 3): + time = now_ - timedelta(days=n) + m1.write(n + 1, time=time) + m2.write(n + 2, time=time) + data = c.read() + f1 = m1.field_name + f2 = 'value2' + self.assertIn('x', data) + self.assertIn('traces', data) + self.assertEqual(len(data['x']), 168) + charts = data['traces'] + self.assertIn(f1, charts[0][0]) + self.assertIn(f2, charts[1][0]) + self.assertEqual(len(charts[0][1]), 168) + self.assertEqual(len(charts[1][1]), 168) + self.assertTrue(all(elem in charts[0][1] for elem in [3, 2, 1])) + self.assertTrue(all(elem in charts[1][1] for elem in [4, 3, 2])) + + def test_ilm_disabled(self): + with patch.object(timeseries_db, 'ilm_enabled', False): + self.assertFalse(timeseries_db.ilm_enabled) + self.assertIsNone( + timeseries_db.create_or_alter_retention_policy(name='default') + ) + self.assertIsNone(timeseries_db.get_list_retention_policies()) diff --git a/openwisp_monitoring/db/backends/influxdb/client.py b/openwisp_monitoring/db/backends/influxdb/client.py index b05477ecd..670f2e912 100644 --- a/openwisp_monitoring/db/backends/influxdb/client.py +++ b/openwisp_monitoring/db/backends/influxdb/client.py @@ -51,7 +51,6 @@ class DatabaseClient(object): backend_name = 'influxdb' def __init__(self, db_name=None): - self._db = None self.db_name = db_name or TIMESERIES_DB['NAME'] self.client_error = DatabaseException.client_error @@ -163,7 +162,7 @@ def read(self, key, fields, tags, **kwargs): q = f'{q} LIMIT {limit}' return list(self.query(q, precision='s').get_points()) - def get_list_query(self, query, precision='s'): + def get_list_query(self, query, precision='s', **kwargs): return list(self.query(query, precision=precision).get_points()) def get_list_retention_policies(self): @@ -268,6 +267,7 @@ def __transform_field(self, field, function, operation=None): def _get_top_fields( self, + default_query, query, params, chart_type, @@ -275,9 +275,15 @@ def _get_top_fields( number, time, timezone=settings.TIME_ZONE, + get_fields=True, ): + """ + Returns top fields if ``get_fields`` set to ``True`` (default) + else it returns points containing the top fields. + """ + q = default_query.replace('{field_name}', '{fields}') q = self.get_query( - query=query, + query=q, params=params, chart_type=chart_type, group_map=group_map, @@ -286,7 +292,7 @@ def _get_top_fields( time=time, timezone=timezone, ) - res = list(self.query(q, precision='s').get_points()) + res = self.get_list_query(q) if not res: return [] res = res[0] @@ -296,4 +302,31 @@ def _get_top_fields( keys = list(sorted_dict.keys()) keys.reverse() top = keys[0:number] - return [item.replace('sum_', '') for item in top] + top_fields = [item.replace('sum_', '') for item in top] + if get_fields: + return top_fields + query = self.get_query( + query=query, + params=params, + chart_type=chart_type, + group_map=group_map, + summary=True, + fields=top_fields, + time=time, + timezone=timezone, + ) + return self.get_list_query(query) + + def default_chart_query(self, tags): + q = "SELECT {field_name} FROM {key} WHERE time >= '{time}'" + if tags: + q += " AND content_type = '{content_type}' AND object_id = '{object_id}'" + return q + + def _device_data(self, key, tags, rp, **kwargs): + """ returns last snapshot of ``device_data`` """ + query = ( + f"SELECT data FROM {rp}.{key} WHERE pk = '{tags['pk']}' " + "ORDER BY time DESC LIMIT 1" + ) + return self.get_list_query(query, precision=None) diff --git a/openwisp_monitoring/db/backends/influxdb/queries.py b/openwisp_monitoring/db/backends/influxdb/queries.py index 61c835d39..7bc5b42e5 100644 --- a/openwisp_monitoring/db/backends/influxdb/queries.py +++ b/openwisp_monitoring/db/backends/influxdb/queries.py @@ -58,12 +58,3 @@ ) }, } - -default_chart_query = [ - "SELECT {field_name} FROM {key} WHERE time >= '{time}'", - " AND content_type = '{content_type}' AND object_id = '{object_id}'", -] - -device_data_query = ( - "SELECT data FROM {0}.{1} WHERE pk = '{2}' " "ORDER BY time DESC LIMIT 1" -) diff --git a/openwisp_monitoring/db/backends/influxdb/tests/__init__.py b/openwisp_monitoring/db/backends/influxdb/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/openwisp_monitoring/db/backends/influxdb/tests.py b/openwisp_monitoring/db/backends/influxdb/tests/client_tests.py similarity index 77% rename from openwisp_monitoring/db/backends/influxdb/tests.py rename to openwisp_monitoring/db/backends/influxdb/tests/client_tests.py index b94970d8c..46a2c10bf 100644 --- a/openwisp_monitoring/db/backends/influxdb/tests.py +++ b/openwisp_monitoring/db/backends/influxdb/tests/client_tests.py @@ -1,5 +1,6 @@ -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta +from django.conf import settings from django.core.exceptions import ValidationError from django.test import TestCase from django.utils.timezone import now @@ -8,7 +9,7 @@ from openwisp_monitoring.monitoring.tests import TestMonitoringMixin from swapper import load_model -from .. import timeseries_db +from ... import timeseries_db Chart = load_model('monitoring', 'Chart') Notification = load_model('openwisp_notifications', 'Notification') @@ -38,16 +39,6 @@ def test_get_custom_query(self): q = c.get_query(query=custom_q, fields=['SUM(*)']) self.assertIn('SELECT SUM(*) FROM', q) - def test_is_aggregate_bug(self): - m = self._create_object_metric(name='summary_avg') - c = Chart(metric=m, configuration='dummy') - self.assertFalse(timeseries_db._is_aggregate(c.query)) - - def test_is_aggregate_fields_function(self): - m = self._create_object_metric(name='is_aggregate_func') - c = Chart(metric=m, configuration='uptime') - self.assertTrue(timeseries_db._is_aggregate(c.query)) - def test_get_query_fields_function(self): c = self._create_chart(test_data=None, configuration='histogram') q = c.get_query(fields=['ssh', 'http2', 'apple-music']) @@ -142,21 +133,6 @@ def test_object_same_key_different_fields(self): measurement = timeseries_db.get_list_query(q)[0] self.assertEqual(measurement['upload'], 100) - def test_delete_metric_data(self): - m = self._create_general_metric(name='test_metric') - m.write(100) - self.assertEqual(m.read()[0]['value'], 100) - timeseries_db.delete_metric_data(key=m.key) - self.assertEqual(m.read(), []) - om = self._create_object_metric(name='dummy') - om.write(50) - m.write(100) - self.assertEqual(m.read()[0]['value'], 100) - self.assertEqual(om.read()[0]['value'], 50) - timeseries_db.delete_metric_data() - self.assertEqual(m.read(), []) - self.assertEqual(om.read(), []) - def test_get_query_1d(self): c = self._create_chart(test_data=None, configuration='uptime') q = c.get_query(time='1d') @@ -189,27 +165,11 @@ def test_query_set(self): ) self.assertEqual(c.query, expected) self.assertEqual( - ''.join(timeseries_db.queries.default_chart_query[0:2]), c._default_query + ''.join(timeseries_db.default_chart_query(tags=c.metric.tags)), + c._default_query, ) c.metric.object_id = None - self.assertEqual(timeseries_db.queries.default_chart_query[0], c._default_query) - - def test_read_order(self): - m = self._create_general_metric(name='dummy') - m.write(30) - m.write(40, time=now() - timedelta(days=2)) - with self.subTest('Test ascending read order'): - metric_data = m.read(limit=2, order='time') - self.assertEqual(metric_data[0]['value'], 40) - self.assertEqual(metric_data[1]['value'], 30) - with self.subTest('Test descending read order'): - metric_data = m.read(limit=2, order='-time') - self.assertEqual(metric_data[0]['value'], 30) - self.assertEqual(metric_data[1]['value'], 40) - with self.subTest('Test invalid read order'): - with self.assertRaises(timeseries_db.client_error) as e: - metric_data = m.read(limit=2, order='invalid') - self.assertIn('Invalid order "invalid" passed.', str(e)) + self.assertEqual(timeseries_db.default_chart_query(tags=None), c._default_query) def test_read_with_rp(self): self._create_admin() @@ -240,3 +200,59 @@ def test_metric_write_microseconds_precision(self): m.write('00:14:5c:00:00:00', time=datetime(2020, 7, 31, 22, 5, 47, 235142)) m.write('00:23:4a:00:00:00', time=datetime(2020, 7, 31, 22, 5, 47, 235152)) self.assertEqual(len(m.read()), 2) + + def test_get_query(self): + c = self._create_chart(test_data=False) + m = c.metric + now_ = now() + today = date(now_.year, now_.month, now_.day) + time = today - timedelta(days=6) + expected = c.query.format( + field_name=m.field_name, + key=m.key, + content_type=m.content_type_key, + object_id=m.object_id, + time=str(time), + ) + expected = "{0} tz('{1}')".format(expected, settings.TIME_ZONE) + self.assertEqual(c.get_query(), expected) + + def test_read(self): + c = self._create_chart() + data = c.read() + key = c.metric.field_name + self.assertIn('x', data) + self.assertIn('traces', data) + self.assertEqual(len(data['x']), 3) + charts = data['traces'] + self.assertEqual(charts[0][0], key) + self.assertEqual(len(charts[0][1]), 3) + self.assertEqual(charts[0][1], [3, 6, 9]) + + def test_read_multiple(self): + c = self._create_chart(test_data=None, configuration='multiple_test') + m1 = c.metric + m2 = self._create_object_metric( + name='test metric 2', + key='test_metric', + field_name='value2', + content_object=m1.content_object, + ) + now_ = now() + for n in range(0, 3): + time = now_ - timedelta(days=n) + m1.write(n + 1, time=time) + m2.write(n + 2, time=time) + data = c.read() + f1 = m1.field_name + f2 = 'value2' + self.assertIn('x', data) + self.assertIn('traces', data) + self.assertEqual(len(data['x']), 3) + charts = data['traces'] + self.assertIn(f1, charts[0][0]) + self.assertIn(f2, charts[1][0]) + self.assertEqual(len(charts[0][1]), 3) + self.assertEqual(len(charts[1][1]), 3) + self.assertEqual(charts[0][1], [3, 2, 1]) + self.assertEqual(charts[1][1], [4, 3, 2]) diff --git a/openwisp_monitoring/db/tests.py b/openwisp_monitoring/db/tests.py new file mode 100644 index 000000000..c0103c869 --- /dev/null +++ b/openwisp_monitoring/db/tests.py @@ -0,0 +1,55 @@ +from datetime import timedelta + +from django.utils.timezone import now +from swapper import load_model + +from . import timeseries_db +from .backends import load_backend_module + +Chart = load_model('monitoring', 'Chart') +tests = load_backend_module(module='tests.client_tests') + + +class TestDatabaseClient(tests.TestDatabaseClient): + def test_is_aggregate_bug(self): + m = self._create_object_metric(name='summary_avg') + c = Chart(metric=m, configuration='dummy') + self.assertFalse(timeseries_db._is_aggregate(c.query)) + + def test_is_aggregate_fields_function(self): + m = self._create_object_metric(name='is_aggregate_func') + c = Chart(metric=m, configuration='uptime') + self.assertTrue(timeseries_db._is_aggregate(c.query)) + + def test_delete_metric_data(self): + m = self._create_general_metric(name='test_metric') + m.write(100) + self.assertEqual(m.read()[0]['value'], 100) + timeseries_db.delete_metric_data(key=m.key) + self.assertEqual(m.read(), []) + om = self._create_object_metric(name='dummy') + om.write(50) + m.write(100) + self.assertEqual(m.read()[0]['value'], 100) + self.assertEqual(om.read()[0]['value'], 50) + timeseries_db.delete_metric_data() + self.assertEqual(m.read(), []) + self.assertEqual(om.read(), []) + + def test_read_order(self): + timeseries_db.delete_metric_data() + m = self._create_general_metric(name='dummy') + m.write(40, time=now() - timedelta(days=2)) + m.write(30) + with self.subTest('Test ascending read order'): + metric_data = m.read(limit=2, order='time') + self.assertEqual(metric_data[0]['value'], 40) + self.assertEqual(metric_data[1]['value'], 30) + with self.subTest('Test descending read order'): + metric_data = m.read(limit=2, order='-time') + self.assertEqual(metric_data[0]['value'], 30) + self.assertEqual(metric_data[1]['value'], 40) + with self.subTest('Test invalid read order'): + with self.assertRaises(timeseries_db.client_error) as e: + metric_data = m.read(limit=2, order='invalid') + self.assertIn('Invalid order "invalid" passed.', str(e)) diff --git a/openwisp_monitoring/device/base/models.py b/openwisp_monitoring/device/base/models.py index 0b7cc7387..e979543c6 100644 --- a/openwisp_monitoring/device/base/models.py +++ b/openwisp_monitoring/device/base/models.py @@ -20,7 +20,7 @@ from openwisp_utils.base import TimeStampedEditableModel -from ...db import device_data_query, timeseries_db +from ...db import timeseries_db from ...monitoring.signals import threshold_crossed from .. import settings as app_settings from ..schema import schema @@ -96,8 +96,10 @@ def data(self): """ if self.__data: return self.__data - q = device_data_query.format(SHORT_RP, self.__key, self.pk) - points = timeseries_db.get_list_query(q, precision=None) + # skipped this due to performance and inverted index issues! + points = timeseries_db._device_data( + rp=SHORT_RP, tags={'pk': self.pk}, key=self.__key, fields='data' + ) if not points: return None self.data_timestamp = points[0]['time'] diff --git a/openwisp_monitoring/monitoring/base/models.py b/openwisp_monitoring/monitoring/base/models.py index 6dc810177..48946503b 100644 --- a/openwisp_monitoring/monitoring/base/models.py +++ b/openwisp_monitoring/monitoring/base/models.py @@ -19,7 +19,7 @@ from openwisp_utils.base import TimeStampedEditableModel -from ...db import default_chart_query, timeseries_db +from ...db import timeseries_db from ..charts import ( CHART_CONFIGURATION_CHOICES, DEFAULT_COLORS, @@ -329,10 +329,8 @@ def top_fields(self): @property def _default_query(self): - q = default_chart_query[0] - if self.metric.object_id: - q += default_chart_query[1] - return q + tags = True if self.metric.object_id else False + return timeseries_db.default_chart_query(tags) def get_query( self, @@ -353,10 +351,10 @@ def get_top_fields(self, number): Returns list of top ``number`` of fields (highest sum) of a measurement in the specified time range (descending order). """ - q = self._default_query.replace('{field_name}', '{fields}') params = self._get_query_params(self.DEFAULT_TIME) return timeseries_db._get_top_fields( - query=q, + default_query=self._default_query, + query=self.get_query(), chart_type=self.type, group_map=self.GROUP_MAP, number=number, @@ -401,16 +399,23 @@ def read( try: query_kwargs = dict(time=time, timezone=timezone) if self.top_fields: - fields = self.get_top_fields(self.top_fields) - data_query = self.get_query(fields=fields, **query_kwargs) - summary_query = self.get_query( - fields=fields, summary=True, **query_kwargs + points = summary = timeseries_db._get_top_fields( + default_query=self._default_query, + chart_type=self.type, + group_map=self.GROUP_MAP, + number=self.top_fields, + params=self._get_query_params(self.DEFAULT_TIME), + time=time, + query=self.query, + get_fields=False, ) else: data_query = self.get_query(**query_kwargs) summary_query = self.get_query(summary=True, **query_kwargs) - points = timeseries_db.get_list_query(data_query) - summary = timeseries_db.get_list_query(summary_query) + points = timeseries_db.get_list_query(data_query, key=self.metric.key) + summary = timeseries_db.get_list_query( + summary_query, key=self.metric.key + ) except timeseries_db.client_error as e: logging.error(e, exc_info=True) raise e @@ -463,7 +468,11 @@ def _round(value, decimal_places): control = 1.0 / 10 ** decimal_places if value < control: decimal_places += 2 - return round(value, decimal_places) + value = round(value, decimal_places) + # added for Elasticsearch division outputs (traffic metric) + if isinstance(value, float) and value.is_integer(): + value = int(value) + return value class AbstractAlertSettings(TimeStampedEditableModel): diff --git a/openwisp_monitoring/monitoring/tests/__init__.py b/openwisp_monitoring/monitoring/tests/__init__.py index 0b949de63..d23d462e2 100644 --- a/openwisp_monitoring/monitoring/tests/__init__.py +++ b/openwisp_monitoring/monitoring/tests/__init__.py @@ -7,6 +7,7 @@ from ...db import timeseries_db from ...db.backends import TIMESERIES_DB +from ...db.backends.elasticsearch.queries import _make_query from ..charts import register_chart, unregister_chart start_time = now() @@ -15,6 +16,7 @@ Metric = load_model('monitoring', 'Metric') AlertSettings = load_model('monitoring', 'AlertSettings') +# TODO: Queries relating to top_fields aren't yet # this custom chart configuration is used for automated testing purposes charts = { 'histogram': { @@ -28,7 +30,10 @@ "SELECT {fields|SUM|/ 1} FROM {key} " "WHERE time >= '{time}' AND content_type = " "'{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query( + {'{field_name}': {'sum': {'field': 'points.fields.{field_name}'}}} + ), }, }, 'dummy': { @@ -45,7 +50,7 @@ 'description': 'Bugged chart for testing purposes.', 'unit': 'bugs', 'order': 999, - 'query': {'influxdb': "BAD"}, + 'query': {'influxdb': "BAD", 'elasticsearch': "BAD"}, }, 'default': { 'type': 'line', @@ -57,7 +62,8 @@ 'influxdb': ( "SELECT {field_name} FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query(), }, }, 'multiple_test': { @@ -70,7 +76,13 @@ 'influxdb': ( "SELECT {field_name}, value2 FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query( + { + '{field_name}': {'sum': {'field': 'points.fields.{field_name}'}}, + 'value2': {'sum': {'field': 'points.fields.value2'}}, + } + ), }, }, 'mean_test': { @@ -83,7 +95,8 @@ 'influxdb': ( "SELECT MEAN({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query(), }, }, 'sum_test': { @@ -96,7 +109,10 @@ 'influxdb': ( "SELECT SUM({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query( + {'{field_name}': {'sum': {'field': 'points.fields.{field_name}'}}} + ), }, }, 'top_fields_mean': { @@ -110,7 +126,8 @@ "SELECT {fields|MEAN} FROM {key} " "WHERE time >= '{time}' AND content_type = " "'{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query(), }, }, } @@ -132,9 +149,9 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): - timeseries_db.drop_database() for key in charts.keys(): unregister_chart(key) + timeseries_db.drop_database() def tearDown(self): timeseries_db.delete_metric_data() diff --git a/openwisp_monitoring/monitoring/tests/test_charts.py b/openwisp_monitoring/monitoring/tests/test_charts.py index 33cb1548f..9241e2dd4 100644 --- a/openwisp_monitoring/monitoring/tests/test_charts.py +++ b/openwisp_monitoring/monitoring/tests/test_charts.py @@ -2,7 +2,6 @@ from datetime import date, timedelta from unittest.mock import patch -from django.conf import settings from django.core.exceptions import ImproperlyConfigured, ValidationError from django.test import TestCase from django.utils.timezone import now @@ -25,18 +24,6 @@ class TestCharts(TestMonitoringMixin, TestCase): Tests for functionalities related to charts """ - def test_read(self): - c = self._create_chart() - data = c.read() - key = c.metric.field_name - self.assertIn('x', data) - self.assertIn('traces', data) - self.assertEqual(len(data['x']), 3) - charts = data['traces'] - self.assertEqual(charts[0][0], key) - self.assertEqual(len(charts[0][1]), 3) - self.assertEqual(charts[0][1], [3, 6, 9]) - def test_read_summary_avg(self): m = self._create_object_metric(name='summary_avg') c = self._create_chart(metric=m, test_data=False, configuration='mean_test') @@ -133,34 +120,6 @@ def test_read_summary_top_fields_acid(self): self.assertEqual(data['summary'], {'google': 87500000, 'facebook': 37503000}) self.assertEqual(c.get_top_fields(2), ['google', 'facebook']) - def test_read_multiple(self): - c = self._create_chart(test_data=None, configuration='multiple_test') - m1 = c.metric - m2 = self._create_object_metric( - name='test metric 2', - key='test_metric', - field_name='value2', - content_object=m1.content_object, - ) - now_ = now() - for n in range(0, 3): - time = now_ - timedelta(days=n) - m1.write(n + 1, time=time) - m2.write(n + 2, time=time) - data = c.read() - f1 = m1.field_name - f2 = 'value2' - self.assertIn('x', data) - self.assertIn('traces', data) - self.assertEqual(len(data['x']), 3) - charts = data['traces'] - self.assertIn(f1, charts[0][0]) - self.assertIn(f2, charts[1][0]) - self.assertEqual(len(charts[0][1]), 3) - self.assertEqual(len(charts[1][1]), 3) - self.assertEqual(charts[0][1], [3, 2, 1]) - self.assertEqual(charts[1][1], [4, 3, 2]) - def test_json(self): c = self._create_chart() data = c.read() @@ -180,22 +139,6 @@ def test_read_bad_query(self): else: self.fail('ValidationError not raised') - def test_get_query(self): - c = self._create_chart(test_data=False) - m = c.metric - now_ = now() - today = date(now_.year, now_.month, now_.day) - time = today - timedelta(days=6) - expected = c.query.format( - field_name=m.field_name, - key=m.key, - content_type=m.content_type_key, - object_id=m.object_id, - time=str(time), - ) - expected = "{0} tz('{1}')".format(expected, settings.TIME_ZONE) - self.assertEqual(c.get_query(), expected) - def test_description(self): c = self._create_chart(test_data=False) self.assertEqual(c.description, 'Dummy chart for testing purposes.') diff --git a/requirements.txt b/requirements.txt index a9e1c58a8..e362da884 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ openwisp-controller~=0.7.post1 -influxdb>=5.2,<5.3 django-notifications-hq>=1.6,<1.7 django-celery-email>=3.0.0,<3.1 djangorestframework>=3.11,<3.12 diff --git a/setup.py b/setup.py index eb92ce137..58b0e48a5 100755 --- a/setup.py +++ b/setup.py @@ -54,6 +54,10 @@ def get_install_requires(): include_package_data=True, zip_safe=False, install_requires=get_install_requires(), + extras_require={ + 'elasticsearch': ['elasticsearch-dsl>=7.0.0,<8.0.0'], + 'influxdb': ['influxdb>=5.2,<5.3'], + }, classifiers=[ 'Development Status :: 3 - Alpha', 'Environment :: Web Environment', @@ -63,7 +67,7 @@ def get_install_requires(): 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)', 'Operating System :: OS Independent', 'Framework :: Django', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', ], ) diff --git a/tests/openwisp2/settings.py b/tests/openwisp2/settings.py index 7e0c30acb..c31d57c22 100644 --- a/tests/openwisp2/settings.py +++ b/tests/openwisp2/settings.py @@ -16,7 +16,7 @@ } } -TIMESERIES_DATABASE = { +INFLUXDB_SETTINGS = { 'BACKEND': 'openwisp_monitoring.db.backends.influxdb', 'USER': 'openwisp', 'PASSWORD': 'openwisp', @@ -25,6 +25,20 @@ 'PORT': '8086', } +ELASTICSEARCH_SETTINGS = { + 'BACKEND': 'openwisp_monitoring.db.backends.elasticsearch', + 'USER': 'openwisp', + 'PASSWORD': 'openwisp', + 'NAME': 'openwisp2', + 'HOST': os.getenv('ELASTICSEARCH_HOST', 'localhost'), + 'PORT': '9200', +} + +if os.environ.get('elasticsearch', False): + TIMESERIES_DATABASE = ELASTICSEARCH_SETTINGS +else: + TIMESERIES_DATABASE = INFLUXDB_SETTINGS + SECRET_KEY = 'fn)t*+$)ugeyip6-#txyy$5wf2ervc0d2n#h)qb)y5@ly$t*@w' INSTALLED_APPS = [