diff --git a/Dockerfile b/Dockerfile
index 44e0b890a..fff279ac6 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -18,6 +18,7 @@ WORKDIR /opt/openwisp/tests/
ENV NAME=openwisp-monitoring \
PYTHONBUFFERED=1 \
INFLUXDB_HOST=influxdb \
- REDIS_HOST=redis
+ REDIS_HOST=redis \
+ ELASTICSEARCH_HOST=es01
CMD ["sh", "docker-entrypoint.sh"]
EXPOSE 8000
diff --git a/README.rst b/README.rst
index 7a945b055..4f0c07043 100644
--- a/README.rst
+++ b/README.rst
@@ -74,6 +74,7 @@ Available Features
* Collects and displays `device status <#device-status>`_ information like uptime, RAM status, CPU load averages,
Interface properties and addresses, WiFi interface status and associated clients,
Neighbors information, DHCP Leases, Disk/Flash status
+* Collection of monitoring information in a timeseries database (`InfluxDB `_ and `Elasticsearch `_ are currently supported)
* Monitoring charts for uptime, packet loss, round trip time (latency), associated wifi clients, interface traffic,
RAM usage, CPU load, flash/disk usage
* Charts can be viewed at resolutions of 1 day, 3 days, a week, a month and a year
@@ -108,6 +109,8 @@ beforehand.
In case you prefer not to use Docker you can `install InfluxDB `_
and Redis from your repositories, but keep in mind that the version packaged by your distribution may be different.
+If you wish to use ``Elasticsearch`` for storing and retrieving timeseries data then `install Elasticsearch `_.
+
Install spatialite and sqlite:
.. code-block:: shell
@@ -165,6 +168,20 @@ Follow the setup instructions of `openwisp-controller
'PORT': '8086',
}
+In case, you wish to use ``Elasticsearch`` for timeseries data storage and retrieval,
+make use of the following settings
+
+.. code-block:: python
+
+ TIMESERIES_DATABASE = {
+ 'BACKEND': 'openwisp_monitoring.db.backends.elasticsearch',
+ 'USER': 'openwisp',
+ 'PASSWORD': 'openwisp',
+ 'NAME': 'openwisp2',
+ 'HOST': 'localhost',
+ 'PORT': '9200',
+ }
+
``urls.py``:
.. code-block:: python
@@ -461,6 +478,9 @@ This data is only used to assess the recent status of devices, keeping
it for a long time would not add much benefit and would cost a lot more
in terms of disk space.
+**Note**: In case you use ``Elasticsearch`` then time shall be taken as integral multiple of a day.
+That means the time ``36h0m0s`` shall be interpreted as ``24h0m0s`` (integral multiple of a day).
+
``OPENWISP_MONITORING_AUTO_PING``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -764,11 +784,21 @@ MB (megabytes) instead of GB (Gigabytes) you can use:
"SUM(rx_bytes) / 1000000 AS download FROM {key} "
"WHERE time >= '{time}' AND content_type = '{content_type}' "
"AND object_id = '{object_id}' GROUP BY time(1d)"
- )
+ ),
+ 'elasticsearch': _make_query({
+ 'upload': {'sum': {'field': 'points.fields.tx_bytes'}},
+ 'download': {'avg': {'field': 'points.fields.rx_bytes'}},
+ })
},
}
}
+ # Please declare the operations separately in case you use elasticsearch as done below
+ OPENWISP_MONITORING_ADDITIONAL_CHART_OPERATIONS = {
+ 'upload': {'operator': '/', 'value': 1000000},
+ 'download': {'operator': '/', 'value': 1000000},
+ }
+
Or if you want to define a new chart configuration, which you can then
call in your custom code (eg: a custom check class), you can do so as follows:
@@ -776,6 +806,8 @@ call in your custom code (eg: a custom check class), you can do so as follows:
from django.utils.translation import gettext_lazy as _
+ from openwisp_monitoring.db.backends.elasticsearch import _make_query
+
OPENWISP_MONITORING_CHARTS = {
'ram': {
'type': 'line',
@@ -789,7 +821,12 @@ call in your custom code (eg: a custom check class), you can do so as follows:
"MEAN(buffered) AS buffered FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}' "
"GROUP BY time(1d)"
- )
+ ),
+ 'elasticsearch': _make_query({
+ 'total': {'avg': {'field': 'points.fields.total'}},
+ 'free': {'avg': {'field': 'points.fields.free'}},
+ 'buffered': {'avg': {'field': 'points.fields.buffered'}},
+ })
},
}
}
diff --git a/docker-compose.yml b/docker-compose.yml
index 6386355ed..795fed19a 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -11,6 +11,8 @@ services:
depends_on:
- influxdb
- redis
+ - es01
+ - es02
influxdb:
image: influxdb:1.8-alpine
@@ -22,6 +24,45 @@ services:
INFLUXDB_DB: openwisp2
INFLUXDB_USER: openwisp
INFLUXDB_USER_PASSWORD: openwisp
+ # clustered version of elasticsearch is used as that might be used in production
+ es01:
+ image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0
+ container_name: es01
+ environment:
+ - "node.name=es01"
+ - "discovery.seed_hosts=es02"
+ - "cluster.initial_master_nodes=es01,es02"
+ - "cluster.name=openwisp2"
+ - "bootstrap.memory_lock=true"
+ - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
+ ulimits:
+ memlock:
+ soft: -1
+ hard: -1
+ volumes:
+ - esdata01:/usr/share/elasticsearch/data
+ ports:
+ - 9200:9200
+ networks:
+ - esnet
+ es02:
+ image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0
+ container_name: es02
+ environment:
+ - "node.name=es02"
+ - "discovery.seed_hosts=es01"
+ - "cluster.initial_master_nodes=es01,es02"
+ - "cluster.name=openwisp2"
+ - "bootstrap.memory_lock=true"
+ - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
+ ulimits:
+ memlock:
+ soft: -1
+ hard: -1
+ volumes:
+ - esdata02:/usr/share/elasticsearch/data
+ networks:
+ - esnet
redis:
image: redis:5.0-alpine
@@ -31,3 +72,10 @@ services:
volumes:
influxdb-data: {}
+ esdata01:
+ driver: local
+ esdata02:
+ driver: local
+
+networks:
+ esnet:
diff --git a/openwisp_monitoring/db/__init__.py b/openwisp_monitoring/db/__init__.py
index 063d2d8f7..64510ebc5 100644
--- a/openwisp_monitoring/db/__init__.py
+++ b/openwisp_monitoring/db/__init__.py
@@ -1,7 +1,5 @@
from .backends import timeseries_db
chart_query = timeseries_db.queries.chart_query
-default_chart_query = timeseries_db.queries.default_chart_query
-device_data_query = timeseries_db.queries.device_data_query
-__all__ = ['timeseries_db', 'chart_query', 'default_chart_query', 'device_data_query']
+__all__ = ['timeseries_db', 'chart_query']
diff --git a/openwisp_monitoring/db/backends/__init__.py b/openwisp_monitoring/db/backends/__init__.py
index 715b1113c..b04e09db6 100644
--- a/openwisp_monitoring/db/backends/__init__.py
+++ b/openwisp_monitoring/db/backends/__init__.py
@@ -48,7 +48,8 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
except ImportError as e:
# The database backend wasn't found. Display a helpful error message
# listing all built-in database backends.
- builtin_backends = ['influxdb']
+ builtin_backends = ['influxdb', 'elasticsearch']
+ raise e
if backend_name not in [
f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends
]:
diff --git a/openwisp_monitoring/db/backends/elasticsearch/__init__.py b/openwisp_monitoring/db/backends/elasticsearch/__init__.py
new file mode 100644
index 000000000..5eb061d1b
--- /dev/null
+++ b/openwisp_monitoring/db/backends/elasticsearch/__init__.py
@@ -0,0 +1,3 @@
+from .queries import _make_query
+
+__all__ = ['_make_query']
diff --git a/openwisp_monitoring/db/backends/elasticsearch/client.py b/openwisp_monitoring/db/backends/elasticsearch/client.py
new file mode 100644
index 000000000..574744e79
--- /dev/null
+++ b/openwisp_monitoring/db/backends/elasticsearch/client.py
@@ -0,0 +1,466 @@
+import json
+import logging
+import re
+from collections import Counter
+from copy import deepcopy
+from datetime import datetime, timedelta
+
+from django.conf import settings
+from django.core.exceptions import ValidationError
+from django.utils.functional import cached_property
+from elasticsearch import Elasticsearch
+from elasticsearch.exceptions import ElasticsearchException, NotFoundError
+from elasticsearch_dsl import Search
+from elasticsearch_dsl.connections import connections
+from pytz import timezone as tz
+
+from openwisp_utils.utils import deep_merge_dicts
+
+from ...exceptions import TimeseriesWriteException
+from .. import TIMESERIES_DB
+from .index import MetricDocument, Point, find_metric
+from .queries import default_chart_query, math_map, operator_lookup
+from .retention_policies import _make_policy, default_rp_policy
+
+logger = logging.getLogger(__name__)
+
+
+class DatabaseClient(object):
+ _AGGREGATE = [
+ 'filters',
+ 'children',
+ 'parent',
+ 'date_histogram',
+ 'auto_date_histogram',
+ 'date_range',
+ 'geo_distance',
+ 'geohash_grid',
+ 'geotile_grid',
+ 'global',
+ 'geo_centroid',
+ 'global',
+ 'ip_range',
+ 'missing',
+ 'nested',
+ 'range',
+ 'reverse_nested',
+ 'significant_terms',
+ 'significant_text',
+ 'sampler',
+ 'terms',
+ 'diversified_sampler',
+ 'composite',
+ 'top_hits',
+ 'avg',
+ 'weighted_avg',
+ 'cardinality',
+ 'extended_stats',
+ 'geo_bounds',
+ 'max',
+ 'min',
+ 'percentiles',
+ 'percentile_ranks',
+ 'scripted_metric',
+ 'stats',
+ 'sum',
+ 'value_count',
+ ]
+ backend_name = 'elasticsearch'
+
+ def __init__(self, db_name='metric'):
+ self.db_name = db_name or TIMESERIES_DB['NAME']
+ self.client_error = ElasticsearchException
+
+ def create_database(self):
+ """ creates connection to elasticsearch """
+ connections.create_connection(
+ hosts=[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"]
+ )
+ db = self.get_db
+ # Skip if support for Index Lifecycle Management is disabled or no privileges
+ self.ilm_enabled = db.ilm.start()['acknowledged']
+ self.create_or_alter_retention_policy(name='default')
+
+ def drop_database(self):
+ """ deletes all indices """
+ logger.debug('Deleted all indices data from Elasticsearch')
+
+ @cached_property
+ def get_db(self):
+ """ Returns an ``Elasticsearch Client`` instance """
+ return Elasticsearch(
+ hosts=[{'host': TIMESERIES_DB['HOST'], 'port': TIMESERIES_DB['PORT']}],
+ http_auth=(TIMESERIES_DB['USER'], TIMESERIES_DB['PASSWORD']),
+ retry_on_timeout=True,
+ # sniff before doing anything
+ sniff_on_start=True,
+ # refresh nodes after a node fails to respond
+ sniff_on_connection_fail=True,
+ )
+
+ def create_or_alter_retention_policy(self, name, duration=None):
+ """
+ creates or alters existing retention policy if necessary
+
+ Note: default retention policy can't be altered with this function
+ """
+ if not self.ilm_enabled:
+ return
+ ilm = self.get_db.ilm
+ if not duration:
+ try:
+ ilm.get_lifecycle(policy='default')
+ except NotFoundError:
+ ilm.put_lifecycle(policy='default', body=default_rp_policy)
+ return
+ days = f'{int(duration.split("h")[0]) // 24}d'
+ duration_changed = False
+ try:
+ policy = ilm.get_lifecycle(policy=name)
+ exists = True
+ current_duration = policy[name]['policy']['phases']['hot']['actions'][
+ 'rollover'
+ ]['max_age']
+ duration_changed = current_duration != days
+ except NotFoundError:
+ exists = False
+ if not exists or duration_changed:
+ policy = _make_policy(days)
+ ilm.put_lifecycle(policy=name, body=policy)
+
+ def get_list_retention_policies(self):
+ if not self.ilm_enabled:
+ return
+ return self.get_db.ilm.get_lifecycle()
+
+ def query(self, query, key=None, **kwargs):
+ try:
+ response = (
+ Search(using=self.get_db, index=key)
+ .update_from_dict(query)
+ .execute()
+ .to_dict()
+ )
+ if not response['hits']['total']['value']:
+ return {}
+ return response
+ except NotFoundError:
+ return {}
+
+ def write(self, name, values, **kwargs):
+ rp = kwargs.get('retention_policy')
+ tags = kwargs.get('tags')
+ timestamp = kwargs.get('timestamp')
+ try:
+ metric_id, index = find_metric(self.get_db, name, tags, rp, add=True)
+ metric_index = MetricDocument.get(metric_id, index=index, using=self.get_db)
+ point = Point(fields=values, time=timestamp or datetime.now())
+ metric_index.points.append(point)
+ metric_index.save()
+ except Exception as exception:
+ logger.warning(f'got exception while writing to tsdb: {exception}')
+ raise TimeseriesWriteException
+ # Elasticsearch automatically refreshes indices every second
+ # but we can't wait that long especially for tests
+ self.get_db.indices.refresh(index=name)
+
+ def read(self, key, fields, tags=None, limit=1, order='time', **kwargs):
+ """ ``since`` should be of the form 'now() - s' """
+ extra_fields = kwargs.get('extra_fields')
+ time_format = kwargs.get('time_format')
+ since = kwargs.get('since')
+ try:
+ metric_id, index = find_metric(self.get_db, key, tags)
+ except TypeError:
+ return []
+ metric_index = MetricDocument.get(index=index, id=metric_id, using=self.get_db)
+ # distinguish between traffic and clients
+ points = []
+ for point in list(metric_index.points):
+ if fields in point.fields.to_dict():
+ points.append(point)
+ if order == 'time':
+ points = points[0:limit]
+ elif order == '-time':
+ points = list(reversed(points))[0:limit]
+ else:
+ raise self.client_error(
+ f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get '
+ 'result sorted in ascending /descending order respectively.'
+ )
+ if extra_fields and extra_fields != '*':
+ assert isinstance(extra_fields, list)
+ for count, point in enumerate(points):
+ fields_dict = point.to_dict()['fields']
+ point = {
+ 'time': self._format_time(point['time'], time_format),
+ fields: fields_dict[fields],
+ }
+ for extra_field in extra_fields:
+ if fields_dict.get(extra_field) is not None:
+ point.update({extra_field: fields_dict[extra_field]})
+ points[count] = point
+ elif extra_fields == '*':
+ for count, point in enumerate(points):
+ points[count] = deep_merge_dicts(
+ point.fields.to_dict(),
+ {'time': self._format_time(point.time, time_format)},
+ )
+ else:
+ points = [
+ deep_merge_dicts(
+ {fields: p.fields.to_dict()[fields]},
+ {'time': self._format_time(p.time, time_format)},
+ )
+ for p in points
+ ]
+ if since:
+ since = datetime.now().timestamp() - int(re.search(r'\d+', since).group())
+ points = [point for point in points if point['time'] >= since]
+ return points
+
+ def _format_time(self, obj, time_format=None):
+ """ returns datetime object in isoformat / unix timestamp and UTC timezone """
+ if time_format == 'isoformat':
+ return obj.astimezone(tz=tz('UTC')).isoformat(timespec='seconds')
+ return int(obj.astimezone(tz=tz('UTC')).timestamp())
+
+ def get_list_query(self, query, precision='s', key=None):
+ response = self.query(query, key)
+ try:
+ points = response['aggregations']['GroupByTime']['set_range']['time'][
+ 'buckets'
+ ]
+ list_points = self._fill_points(
+ query, [self._format(point, precision) for point in points],
+ )
+ list_points.reverse()
+ except KeyError:
+ return []
+ return list_points
+
+ def _fill_points(self, query, points):
+ _range = query['aggs']['GroupByTime']['nested']['aggs']['set_range']
+ # if not _range or not points:
+ # return points
+ days = int(_range['filter']['range']['points.time']['from'][4:-3])
+ # return if summary query
+ try:
+ interval = _range['aggs']['time']['date_histogram']['fixed_interval']
+ except KeyError:
+ return points
+ # return if top_fields query
+ if interval == '365d':
+ return points
+ interval_dict = {'10m': 600, '20m': 1200, '1h': 3600, '24h': 86400}
+ interval = interval_dict[interval]
+ start_time = datetime.now()
+ end_time = start_time - timedelta(days=days) # include today
+ dummy_point = deepcopy(points[0])
+ start_ts = points[0]['time'] + interval
+ end_ts = points[-1]['time'] - interval
+ for field in dummy_point.keys():
+ dummy_point[field] = None if field != 'wifi_clients' else 0
+ while start_ts < start_time.timestamp():
+ dummy_point['time'] = start_ts
+ points.insert(0, deepcopy(dummy_point))
+ start_ts += interval
+ # TODO: This is required due to time_zone issues
+ while points[-1]['time'] < end_time.timestamp():
+ points.pop(-1)
+ while end_ts > end_time.timestamp():
+ dummy_point['time'] = end_ts
+ points.append(deepcopy(dummy_point))
+ end_ts -= interval
+ return points
+
+ def delete_metric_data(self, key=None, tags=None):
+ """
+ deletes a specific metric based on given key and tags;
+ deletes all metrics if neither provided
+ """
+ if key and tags:
+ metric_id, index = find_metric(self.get_db, key, tags)
+ self.get_db.delete(index=index, id=metric_id)
+ elif key:
+ self.get_db.indices.delete_alias(
+ index=f'{key}-*', name=key, ignore=[400, 404]
+ )
+ else:
+ self.get_db.indices.delete(index='*', ignore=[400, 404])
+ # TODO: Speed up tests by retaining indices, almost 50s difference on travis
+ # try:
+ # self.get_db.delete_by_query(index='_all', body={'query': {'match_all': {}}})
+ # except self.client_error:
+ # pass
+
+ # Chart related functions below
+
+ def validate_query(self, query):
+ if not isinstance(query, dict):
+ raise ValidationError(
+ {'configuration': f'error parsing query: found {query}'}
+ )
+ # Elasticsearch currently supports validation of only query section,
+ # aggs, size, _source etc. are not supported
+ try:
+ valid_check = self.get_db.indices.validate_query(
+ body={'query': query['query']}, explain=True
+ )
+ except NotFoundError:
+ return True
+ # Show a helpful message for failure
+ if not valid_check['valid']:
+ raise ValidationError(valid_check['error'])
+ return self._is_aggregate(query)
+
+ def _is_aggregate(self, query):
+ agg_dict = query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs'][
+ 'time'
+ ]['aggs']['nest']['nested']['aggs'].values()
+ agg = []
+ for item in agg_dict:
+ agg.append(next(iter(item)))
+ is_aggregate = True if set(agg) <= set(self._AGGREGATE) else False
+ return is_aggregate if 'size' in query else False
+
+ def get_query(
+ self,
+ chart_type,
+ params,
+ time,
+ group_map,
+ summary=False,
+ fields=None,
+ query=None,
+ timezone=settings.TIME_ZONE,
+ ):
+ if not params.get('object_id'):
+ del query['query']
+ query = json.dumps(query)
+ for k, v in params.items():
+ query = query.replace('{' + k + '}', v)
+ query = self._group_by(query, time, chart_type, group_map, strip=summary)
+ set_range = query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs'][
+ 'time'
+ ]
+ if fields:
+ aggregate_dict = set_range['aggs']['nest']['nested']['aggs']
+ agg = deepcopy(aggregate_dict).popitem()[1].popitem()[0]
+ aggregate_dict.update(
+ {
+ f'{field}': {agg: {'field': f'points.fields.{field}'}}
+ for field in fields
+ }
+ )
+ try:
+ set_range['date_histogram']['time_zone'] = timezone
+ except KeyError:
+ pass
+ return query
+
+ def _group_by(self, query, time, chart_type, group_map, strip=False):
+ query = query.replace('1d/d', f'{time}/d')
+ if not strip and not chart_type == 'histogram':
+ value = group_map[time]
+ query = query.replace('10m', value)
+ if strip:
+ query = json.loads(query)
+ _range = query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs']
+ _range['time'].pop('date_histogram')
+ _range['time']['auto_date_histogram'] = {
+ 'field': 'points.time',
+ 'format': 'date_time_no_millis',
+ 'buckets': 1,
+ }
+ if isinstance(query, str):
+ query = json.loads(query)
+ return query
+
+ def _get_top_fields(
+ self,
+ params,
+ chart_type,
+ group_map,
+ number,
+ query=None,
+ timezone=settings.TIME_ZONE,
+ get_fields=True,
+ **kwargs,
+ ):
+ """
+ Returns top fields if ``get_fields`` set to ``True`` (default)
+ else it returns points containing the top fields.
+ """
+ try:
+ response = self.get_db.indices.get_mapping(index=params['key'])
+ except NotFoundError:
+ return []
+ fields = [
+ k
+ for k, v in list(response.values())[0]['mappings']['properties']['points'][
+ 'properties'
+ ]['fields']['properties'].items()
+ ]
+ query = self.get_query(
+ chart_type=chart_type,
+ params=params,
+ time='365d',
+ group_map=group_map,
+ summary=True,
+ fields=fields,
+ query=query,
+ timezone=timezone,
+ )
+ point = self.get_list_query(query, key=params['key'])[0]
+ time = point.pop('time')
+ point = Counter(point).most_common(number)
+ if get_fields:
+ return [k for k, v in point]
+ points = [{'time': time}]
+ for k, v in point:
+ points[0][k] = v
+ return points
+
+ def _format(self, point, precision='s'):
+ """ allowed values for precision are ``s`` and ``ms`` """
+ pt = {}
+ # By default values returned are in millisecond precision
+ if precision == 'ms':
+ pt['time'] = point['key'] / 1000
+ else:
+ pt['time'] = point['key'] // 1000
+ for key, value in point.items():
+ if isinstance(value, dict):
+ for k, v in value.items():
+ if isinstance(v, dict):
+ pt[k] = self._transform_field(k, v['value'])
+ return pt
+
+ def _transform_field(self, field, value):
+ """ Performs arithmetic operations on the field if required """
+ if value is None:
+ return value
+ if field in math_map:
+ op = operator_lookup.get(math_map[field]['operator'])
+ if op is not None:
+ value = op(value, math_map[field]['value'])
+ return value
+
+ def default_chart_query(self, tags=False):
+ q = deepcopy(default_chart_query)
+ # This is used to distinguish that it's default query
+ del q['size']
+ if not tags:
+ q['query']['nested']['query']['bool']['must'] = []
+ return q
+
+ def _device_data(self, key, tags, fields, **kwargs):
+ """ returns last snapshot of ``device_data`` """
+ return self.read(
+ key=key, fields=fields, tags=tags, order='-time', time_format='isoformat',
+ )
+
+
+# TODO:
+# _fill_points has a while which shouldn't be required
diff --git a/openwisp_monitoring/db/backends/elasticsearch/index.py b/openwisp_monitoring/db/backends/elasticsearch/index.py
new file mode 100644
index 000000000..ae6b6aad4
--- /dev/null
+++ b/openwisp_monitoring/db/backends/elasticsearch/index.py
@@ -0,0 +1,92 @@
+import uuid
+
+from django.conf import settings
+from elasticsearch.exceptions import NotFoundError
+from elasticsearch_dsl import Date, Document, InnerDoc, Nested, Q, Search
+
+
+class Point(InnerDoc):
+ time = Date(required=True, default_timezone=settings.TIME_ZONE)
+ fields = Nested(dynamic=True, required=True, multi=True)
+
+
+class MetricDocument(Document):
+ tags = Nested(dynamic=True, required=False, multi=True)
+ points = Nested(Point)
+
+ class Index:
+ name = 'metric'
+ settings = {
+ 'number_of_shards': 1,
+ 'number_of_replicas': 0,
+ 'lifecycle.name': 'default',
+ 'lifecycle.rollover_alias': 'metric',
+ }
+
+
+def find_metric(client, index, tags, retention_policy=None, add=False):
+ search = Search(using=client, index=index)
+ if tags:
+ tags_dict = dict()
+ for key, value in tags.items():
+ tags_dict[f'tags.{key}'] = value
+ q = Q(
+ 'nested',
+ path='tags',
+ query=Q(
+ 'bool', must=[Q('match', **{k: str(v)}) for k, v in tags_dict.items()]
+ ),
+ )
+ else:
+ q = Q()
+ try:
+ result = list(search.query(q).execute())[0].meta
+ return result['id'], result['index']
+ except (NotFoundError, AttributeError, IndexError):
+ if add:
+ document = create_document(
+ client, index, tags, retention_policy=retention_policy
+ )
+ return document['_id'], document['_index']
+ return None
+
+
+def create_document(client, key, tags, _id=None, retention_policy=None):
+ """
+ Adds document to relevant index using ``keys``, ``tags`` and ``id`` provided.
+ If no ``id`` is provided a random ``uuid`` would be used.
+ """
+ _id = str(_id or uuid.uuid1())
+ # If index exists, create the document and return
+ try:
+ index_aliases = client.indices.get_alias(index=key)
+ for k, v in index_aliases.items():
+ if v['aliases'][key]['is_write_index']:
+ break
+ client.create(index=k, id=_id, body={'tags': tags})
+ return {'_id': _id, '_index': k}
+ except NotFoundError:
+ pass
+ # Create a new index if it doesn't exist
+ name = f'{key}-000001'
+ document = MetricDocument(meta={'id': _id})
+ document._index = document._index.clone(name)
+ # Create a new index template if it doesn't exist
+ if not client.indices.exists_template(name=key):
+ document._index.settings(**{'lifecycle.rollover_alias': key})
+ if retention_policy:
+ document._index.settings(**{'lifecycle.name': retention_policy})
+ # add index pattern is added for Index Lifecycle Management
+ document._index.as_template(key, f'{key}-*').save(using=client)
+ document.init(using=client, index=name)
+ document.meta.index = name
+ document.tags = tags
+ document.save(using=client, index=name)
+ client.indices.put_alias(index=name, name=key, body={'is_write_index': True})
+ if retention_policy:
+ client.indices.put_settings(
+ body={'lifecycle.name': retention_policy}, index=name
+ )
+ client.indices.put_settings(body={'lifecycle.rollover_alias': key}, index=name)
+ client.indices.refresh(index=key)
+ return document.to_dict(include_meta=True)
diff --git a/openwisp_monitoring/db/backends/elasticsearch/queries.py b/openwisp_monitoring/db/backends/elasticsearch/queries.py
new file mode 100644
index 000000000..bdae617eb
--- /dev/null
+++ b/openwisp_monitoring/db/backends/elasticsearch/queries.py
@@ -0,0 +1,130 @@
+import operator
+from copy import deepcopy
+
+from openwisp_utils.utils import deep_merge_dicts
+
+from .settings import ADDITIONAL_CHART_OPERATIONS
+
+default_chart_query = {
+ 'query': {
+ 'nested': {
+ 'path': 'tags',
+ 'query': {
+ 'bool': {
+ 'must': [
+ {'match': {'tags.object_id': {'query': '{object_id}'}}},
+ {'match': {'tags.content_type': {'query': '{content_type}'}}},
+ ]
+ }
+ },
+ },
+ },
+ '_source': False,
+ 'size': 0,
+ 'aggs': {
+ 'GroupByTime': {
+ 'nested': {
+ 'path': 'points',
+ 'aggs': {
+ 'set_range': {
+ 'filter': {
+ 'range': {
+ 'points.time': {'from': 'now-1d/d', 'to': 'now/d'}
+ }
+ },
+ 'aggs': {
+ 'time': {
+ 'date_histogram': {
+ 'field': 'points.time',
+ 'fixed_interval': '10m',
+ 'format': 'date_time_no_millis',
+ 'order': {'_key': 'desc'},
+ },
+ 'aggs': {
+ 'nest': {
+ 'nested': {
+ 'path': 'points.fields',
+ 'aggs': {
+ '{field_name}': {
+ 'avg': {
+ 'field': 'points.fields.{field_name}'
+ }
+ }
+ },
+ }
+ },
+ },
+ },
+ },
+ }
+ },
+ }
+ }
+ },
+}
+
+math_map = {
+ 'uptime': {'operator': '*', 'value': 100},
+ 'memory_usage': {'operator': '*', 'value': 100},
+ 'CPU_load': {'operator': '*', 'value': 100},
+ 'disk_usage': {'operator': '*', 'value': 100},
+ 'upload': {'operator': '/', 'value': 1000000000},
+ 'download': {'operator': '/', 'value': 1000000000},
+}
+
+operator_lookup = {
+ '+': operator.add,
+ '-': operator.sub,
+ '*': operator.mul,
+ '/': operator.truediv,
+}
+
+if ADDITIONAL_CHART_OPERATIONS:
+ assert isinstance(ADDITIONAL_CHART_OPERATIONS, dict)
+ for value in ADDITIONAL_CHART_OPERATIONS.values():
+ assert value['operator'] in operator_lookup
+ assert isinstance(value['value'], (int, float))
+ math_map = deep_merge_dicts(math_map, ADDITIONAL_CHART_OPERATIONS)
+
+
+def _make_query(aggregation=None):
+ query = deepcopy(default_chart_query)
+ if aggregation:
+ query['aggs']['GroupByTime']['nested']['aggs']['set_range']['aggs']['time'][
+ 'aggs'
+ ]['nest']['nested']['aggs'] = aggregation
+ return query
+
+
+def _get_chart_query():
+ aggregation_dict = {
+ 'uptime': {'uptime': {'avg': {'field': 'points.fields.reachable'}}},
+ 'packet_loss': {'packet_loss': {'avg': {'field': 'points.fields.loss'}}},
+ 'rtt': {
+ 'RTT_average': {'avg': {'field': 'points.fields.rtt_avg'}},
+ 'RTT_max': {'avg': {'field': 'points.fields.rtt_max'}},
+ 'RTT_min': {'avg': {'field': 'points.fields.rtt_min'}},
+ },
+ 'traffic': {
+ 'upload': {'sum': {'field': 'points.fields.tx_bytes'}},
+ 'download': {'sum': {'field': 'points.fields.rx_bytes'}},
+ },
+ 'wifi_clients': {
+ 'wifi_clients': {
+ 'cardinality': {
+ 'field': 'points.fields.{field_name}.keyword',
+ 'missing': 0,
+ }
+ }
+ },
+ 'memory': {'memory_usage': {'avg': {'field': 'points.fields.percent_used'}}},
+ 'cpu': {'CPU_load': {'avg': {'field': 'points.fields.cpu_usage'}}},
+ 'disk': {'disk_usage': {'avg': {'field': 'points.fields.used_disk'}}},
+ }
+ query = {}
+ for key, value in aggregation_dict.items():
+ query[key] = {'elasticsearch': _make_query(value)}
+ return query
+
+
+chart_query = _get_chart_query()
diff --git a/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py b/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py
new file mode 100644
index 000000000..d4269959e
--- /dev/null
+++ b/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py
@@ -0,0 +1,41 @@
+# By default age is calculated from the date the index is created but if the
+# index has been rolled over than the rollover date is used to calculate the age
+
+default_rp_policy = {
+ 'policy': {
+ 'phases': {
+ 'hot': {
+ 'actions': {
+ 'rollover': {'max_age': '30d', 'max_size': '90G'},
+ 'set_priority': {'priority': 100},
+ }
+ },
+ 'warm': {
+ 'min_age': '30d',
+ 'actions': {
+ 'forcemerge': {'max_num_segments': 1},
+ 'allocate': {'number_of_replicas': 0},
+ 'set_priority': {'priority': 50},
+ },
+ },
+ 'cold': {'min_age': '150d', 'actions': {'freeze': {}}},
+ 'delete': {'min_age': '335d', 'actions': {'delete': {}}},
+ }
+ }
+}
+
+
+def _make_policy(max_age):
+ return {
+ 'policy': {
+ 'phases': {
+ 'hot': {
+ 'actions': {
+ 'rollover': {'max_age': max_age},
+ 'set_priority': {'priority': 100},
+ }
+ },
+ 'delete': {'actions': {'delete': {}}},
+ }
+ }
+ }
diff --git a/openwisp_monitoring/db/backends/elasticsearch/settings.py b/openwisp_monitoring/db/backends/elasticsearch/settings.py
new file mode 100644
index 000000000..8bb6f1159
--- /dev/null
+++ b/openwisp_monitoring/db/backends/elasticsearch/settings.py
@@ -0,0 +1,5 @@
+from django.conf import settings
+
+ADDITIONAL_CHART_OPERATIONS = getattr(
+ settings, 'OPENWISP_MONITORING_ADDITIONAL_CHART_OPERATIONS', {}
+)
diff --git a/openwisp_monitoring/db/utils.py b/openwisp_monitoring/db/backends/elasticsearch/tests/__init__.py
similarity index 100%
rename from openwisp_monitoring/db/utils.py
rename to openwisp_monitoring/db/backends/elasticsearch/tests/__init__.py
diff --git a/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py b/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py
new file mode 100644
index 000000000..0f4f72f7c
--- /dev/null
+++ b/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py
@@ -0,0 +1,296 @@
+from datetime import datetime, timedelta
+from importlib import reload
+from unittest.mock import patch
+
+from celery.exceptions import Retry
+from django.conf import settings
+from django.core.exceptions import ValidationError
+from django.utils.timezone import now
+from elasticsearch.exceptions import ElasticsearchException
+from freezegun import freeze_time
+from pytz import timezone as tz
+
+from openwisp_monitoring.device.settings import SHORT_RETENTION_POLICY
+from openwisp_monitoring.device.tests import DeviceMonitoringTestCase
+from openwisp_monitoring.device.utils import SHORT_RP, manage_short_retention_policy
+
+from ....exceptions import TimeseriesWriteException
+from ... import timeseries_db
+from .. import queries as queries_module
+from ..index import MetricDocument
+
+
+class TestDatabaseClient(DeviceMonitoringTestCase):
+ def test_get_query_fields_function(self):
+ c = self._create_chart(test_data=None, configuration='histogram')
+ q = c.get_query(fields=['ssh', 'http2', 'apple-music'])
+ self.assertIn("'ssh': {'sum': {'field': 'points.fields.ssh'}}", str(q))
+ self.assertIn("'http2': {'sum': {'field': 'points.fields.http2'}}", str(q))
+ self.assertIn(
+ "'apple-music': {'sum': {'field': 'points.fields.apple-music'}}", str(q)
+ )
+
+ def test_default_query(self):
+ c = self._create_chart(test_data=False)
+ q = timeseries_db.default_chart_query(tags=True)
+ self.assertEqual(c.query, q)
+
+ def test_write(self):
+ timeseries_db.write('test_write', dict(value=2))
+ measurement = timeseries_db.read(key='test_write', fields='value')[0]
+ self.assertEqual(measurement['value'], 2)
+
+ def test_general_write(self):
+ m = self._create_general_metric(name='Sync test')
+ m.write(1)
+ measurement = timeseries_db.read(key='sync_test', fields='value')[0]
+ self.assertEqual(measurement['value'], 1)
+
+ def test_object_write(self):
+ om = self._create_object_metric()
+ om.write(3)
+ measurement = timeseries_db.read(
+ key='test_metric', fields='value', tags=om.tags
+ )[0]
+ self.assertEqual(measurement['value'], 3)
+
+ def test_general_same_key_different_fields(self):
+ down = self._create_general_metric(
+ name='traffic (download)', key='traffic', field_name='download'
+ )
+ down.write(200)
+ up = self._create_general_metric(
+ name='traffic (upload)', key='traffic', field_name='upload'
+ )
+ up.write(100)
+ measurement = timeseries_db.read(key='traffic', fields='download')[0]
+ self.assertEqual(measurement['download'], 200)
+ measurement = timeseries_db.read(key='traffic', fields='upload')[0]
+ self.assertEqual(measurement['upload'], 100)
+
+ def test_object_same_key_different_fields(self):
+ user = self._create_user()
+ user_down = self._create_object_metric(
+ name='traffic (download)',
+ key='traffic',
+ field_name='download',
+ content_object=user,
+ )
+ user_down.write(200)
+ user_up = self._create_object_metric(
+ name='traffic (upload)',
+ key='traffic',
+ field_name='upload',
+ content_object=user,
+ )
+ user_up.write(100)
+ measurement = timeseries_db.read(
+ key='traffic', fields='download', tags=user_down.tags
+ )[0]
+ self.assertEqual(measurement['download'], 200)
+ measurement = timeseries_db.read(
+ key='traffic', fields='upload', tags=user_up.tags
+ )[0]
+ self.assertEqual(measurement['upload'], 100)
+
+ def test_get_query_1d(self):
+ c = self._create_chart(test_data=None, configuration='uptime')
+ q = c.get_query(time='1d')
+ time_map = c.GROUP_MAP['1d']
+ self.assertIn(
+ "{'range': {'points.time': {'from': 'now-1d/d', 'to': 'now/d'}}}", str(q)
+ )
+ self.assertIn(f"'fixed_interval': '{time_map}'", str(q))
+
+ def test_get_query_30d(self):
+ c = self._create_chart(test_data=None, configuration='uptime')
+ q = c.get_query(time='30d')
+ time_map = c.GROUP_MAP['30d']
+ self.assertIn(
+ "{'range': {'points.time': {'from': 'now-30d/d', 'to': 'now/d'}}}", str(q)
+ )
+ self.assertIn(f"'fixed_interval': '{time_map}'", str(q))
+
+ def test_retention_policy(self):
+ manage_short_retention_policy()
+ rp = timeseries_db.get_list_retention_policies()
+ assert 'default' in rp
+ assert SHORT_RP in rp
+ days = f'{int(SHORT_RETENTION_POLICY.split("h")[0]) // 24}d'
+ self.assertEqual(
+ rp['short']['policy']['phases']['hot']['actions']['rollover']['max_age'],
+ days,
+ )
+
+ def test_get_query(self):
+ c = self._create_chart(test_data=False)
+ m = c.metric
+ params = dict(
+ field_name=m.field_name,
+ key=m.key,
+ content_type=m.content_type_key,
+ object_id=m.object_id,
+ time=c.DEFAULT_TIME,
+ )
+ expected = timeseries_db.get_query(
+ c.type,
+ params,
+ c.DEFAULT_TIME,
+ c.GROUP_MAP,
+ query=c.query,
+ timezone=settings.TIME_ZONE,
+ )
+ self.assertEqual(c.get_query(), expected)
+
+ def test_query_no_index(self):
+ timeseries_db.delete_metric_data(key='ping')
+ c = self._create_chart(test_data=False)
+ q = c.get_query()
+ self.assertEqual(timeseries_db.query(q, index='ping'), {})
+ self.assertEqual(timeseries_db.get_list_query(q), [])
+
+ def test_1d_chart_data(self):
+ c = self._create_chart()
+ data = c.read(time='1d')
+ self.assertIn('x', data)
+ self.assertEqual(len(data['x']), 144)
+ self.assertIn('traces', data)
+ self.assertEqual(9.0, data['traces'][0][1][-1])
+ # Test chart with old data has same length
+ m = self._create_general_metric(name='dummy')
+ c = self._create_chart(metric=m, test_data=False)
+ m.write(6.0, time=now() - timedelta(hours=23))
+ data = c.read(time='1d')
+ self.assertIn('x', data)
+ self.assertEqual(len(data['x']), 144)
+ self.assertIn('traces', data)
+ self.assertIn(6.0, data['traces'][0][1])
+
+ def test_delete_metric_data(self):
+ obj = self._create_user()
+ om = self._create_object_metric(name='Logins', content_object=obj)
+ om.write(100)
+ self.assertEqual(om.read()[0]['value'], 100)
+ timeseries_db.delete_metric_data(key=om.key, tags=om.tags)
+
+ def test_invalid_query(self):
+ q = timeseries_db.default_chart_query()
+ q['query']['nested']['query']['must'] = 'invalid'
+ try:
+ timeseries_db.validate_query(q)
+ except ValidationError as e:
+ self.assertIn('ParsingException: [bool] malformed query', str(e))
+
+ def test_non_aggregation_query(self):
+ q = {'query': timeseries_db.default_chart_query()['query']}
+ self.assertEqual(timeseries_db.get_list_query(q), [])
+
+ def test_timestamp_precision(self):
+ c = self._create_chart()
+ points = timeseries_db.get_list_query(c.get_query(), precision='ms')
+ self.assertIsInstance(points[0]['time'], float)
+ points = timeseries_db.get_list_query(c.get_query(), precision='s')
+ self.assertIsInstance(points[0]['time'], int)
+
+ def create_docs_single_index(self):
+ m = self._create_object_metric(name='dummy')
+ m.write(1)
+ d = self._create_device(organization=self._create_org())
+ m2 = self._create_object_metric(name='dummy', content_object=d)
+ m2.write(1)
+ self.assertEqual(len(timeseries_db.get_db.indices.get_alias(name='dummy')), 1)
+
+ def test_additional_chart_operations_setting(self):
+ modify_operators = {
+ 'upload': {'operator': '/', 'value': 1000000},
+ 'download': {'operator': '/', 'value': 1000000},
+ }
+ path = 'openwisp_monitoring.db.backends.elasticsearch.queries.ADDITIONAL_CHART_OPERATIONS'
+ with patch.dict(path, modify_operators, clear=True):
+ queries = reload(queries_module)
+ self.assertEqual(queries.ADDITIONAL_CHART_OPERATIONS, modify_operators)
+ self.assertEqual(queries.math_map['upload'], modify_operators['upload'])
+ self.assertEqual(queries.math_map['download'], modify_operators['download'])
+
+ def test_read(self):
+ c = self._create_chart()
+ data = c.read()
+ key = c.metric.field_name
+ self.assertIn('x', data)
+ self.assertIn('traces', data)
+ self.assertEqual(len(data['x']), 168)
+ charts = data['traces']
+ self.assertEqual(charts[0][0], key)
+ self.assertEqual(len(charts[0][1]), 168)
+ self.assertTrue(all(elem in charts[0][1] for elem in [3, 6, 9]))
+
+ def test_read_multiple(self):
+ c = self._create_chart(test_data=None, configuration='multiple_test')
+ m1 = c.metric
+ m2 = self._create_object_metric(
+ name='test metric 2',
+ key='test_metric',
+ field_name='value2',
+ content_object=m1.content_object,
+ )
+ now_ = now()
+ for n in range(0, 3):
+ time = now_ - timedelta(days=n)
+ m1.write(n + 1, time=time)
+ m2.write(n + 2, time=time)
+ data = c.read()
+ f1 = m1.field_name
+ f2 = 'value2'
+ self.assertIn('x', data)
+ self.assertIn('traces', data)
+ self.assertEqual(len(data['x']), 168)
+ charts = data['traces']
+ self.assertIn(f1, charts[0][0])
+ self.assertIn(f2, charts[1][0])
+ self.assertEqual(len(charts[0][1]), 168)
+ self.assertEqual(len(charts[1][1]), 168)
+ self.assertTrue(all(elem in charts[0][1] for elem in [3, 2, 1]))
+ self.assertTrue(all(elem in charts[1][1] for elem in [4, 3, 2]))
+
+ def test_ilm_disabled(self):
+ with patch.object(timeseries_db, 'ilm_enabled', False):
+ self.assertFalse(timeseries_db.ilm_enabled)
+ self.assertIsNone(
+ timeseries_db.create_or_alter_retention_policy(name='default')
+ )
+ self.assertIsNone(timeseries_db.get_list_retention_policies())
+
+ @patch.object(MetricDocument, 'get', side_effect=ElasticsearchException)
+ def test_write_retry(self, mock_write):
+ with self.assertRaises(TimeseriesWriteException):
+ timeseries_db.write('test_write', {'value': 1})
+ m = self._create_general_metric(name='Test metric')
+ with self.assertRaises(Retry):
+ m.write(1)
+
+ @patch.object(MetricDocument, 'get', side_effect=ElasticsearchException)
+ def test_timeseries_write_params(self, mock_write):
+ with freeze_time('Jan 14th, 2020') as frozen_datetime:
+ m = self._create_general_metric(name='Test metric')
+ with self.assertRaises(Retry) as e:
+ m.write(1)
+ frozen_datetime.tick(delta=timedelta(minutes=10))
+ self.assertEqual(
+ now(), datetime(2020, 1, 14, tzinfo=tz('UTC')) + timedelta(minutes=10)
+ )
+ task_signature = e.exception.sig
+ with patch.object(timeseries_db, 'write') as mock_write:
+ self._retry_task(task_signature)
+ mock_write.assert_called_with(
+ 'test_metric',
+ {'value': 1},
+ database=None,
+ retention_policy=None,
+ tags={},
+ # this should be the original time at the moment of first failure
+ timestamp='2020-01-14T00:00:00Z',
+ )
+
+ def _retry_task(self, task_signature):
+ task_kwargs = task_signature.kwargs
+ task_signature.type.run(**task_kwargs)
diff --git a/openwisp_monitoring/db/backends/influxdb/client.py b/openwisp_monitoring/db/backends/influxdb/client.py
index ab443056a..ae67d37da 100644
--- a/openwisp_monitoring/db/backends/influxdb/client.py
+++ b/openwisp_monitoring/db/backends/influxdb/client.py
@@ -52,7 +52,6 @@ class DatabaseClient(object):
backend_name = 'influxdb'
def __init__(self, db_name=None):
- self._db = None
self.db_name = db_name or TIMESERIES_DB['NAME']
self.client_error = InfluxDBClientError
@@ -164,7 +163,7 @@ def read(self, key, fields, tags, **kwargs):
q = f'{q} LIMIT {limit}'
return list(self.query(q, precision='s').get_points())
- def get_list_query(self, query, precision='s'):
+ def get_list_query(self, query, precision='s', **kwargs):
return list(self.query(query, precision=precision).get_points())
def get_list_retention_policies(self):
@@ -269,6 +268,7 @@ def __transform_field(self, field, function, operation=None):
def _get_top_fields(
self,
+ default_query,
query,
params,
chart_type,
@@ -276,9 +276,15 @@ def _get_top_fields(
number,
time,
timezone=settings.TIME_ZONE,
+ get_fields=True,
):
+ """
+ Returns top fields if ``get_fields`` set to ``True`` (default)
+ else it returns points containing the top fields.
+ """
+ q = default_query.replace('{field_name}', '{fields}')
q = self.get_query(
- query=query,
+ query=q,
params=params,
chart_type=chart_type,
group_map=group_map,
@@ -287,7 +293,7 @@ def _get_top_fields(
time=time,
timezone=timezone,
)
- res = list(self.query(q, precision='s').get_points())
+ res = self.get_list_query(q)
if not res:
return []
res = res[0]
@@ -297,4 +303,31 @@ def _get_top_fields(
keys = list(sorted_dict.keys())
keys.reverse()
top = keys[0:number]
- return [item.replace('sum_', '') for item in top]
+ top_fields = [item.replace('sum_', '') for item in top]
+ if get_fields:
+ return top_fields
+ query = self.get_query(
+ query=query,
+ params=params,
+ chart_type=chart_type,
+ group_map=group_map,
+ summary=True,
+ fields=top_fields,
+ time=time,
+ timezone=timezone,
+ )
+ return self.get_list_query(query)
+
+ def default_chart_query(self, tags):
+ q = "SELECT {field_name} FROM {key} WHERE time >= '{time}'"
+ if tags:
+ q += " AND content_type = '{content_type}' AND object_id = '{object_id}'"
+ return q
+
+ def _device_data(self, key, tags, rp, **kwargs):
+ """ returns last snapshot of ``device_data`` """
+ query = (
+ f"SELECT data FROM {rp}.{key} WHERE pk = '{tags['pk']}' "
+ "ORDER BY time DESC LIMIT 1"
+ )
+ return self.get_list_query(query, precision=None)
diff --git a/openwisp_monitoring/db/backends/influxdb/queries.py b/openwisp_monitoring/db/backends/influxdb/queries.py
index 1ebc07412..47523cb6e 100644
--- a/openwisp_monitoring/db/backends/influxdb/queries.py
+++ b/openwisp_monitoring/db/backends/influxdb/queries.py
@@ -58,12 +58,3 @@
)
},
}
-
-default_chart_query = [
- "SELECT {field_name} FROM {key} WHERE time >= '{time}'",
- " AND content_type = '{content_type}' AND object_id = '{object_id}'",
-]
-
-device_data_query = (
- "SELECT data FROM {0}.{1} WHERE pk = '{2}' " "ORDER BY time DESC LIMIT 1"
-)
diff --git a/openwisp_monitoring/db/backends/influxdb/tests/__init__.py b/openwisp_monitoring/db/backends/influxdb/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/openwisp_monitoring/db/backends/influxdb/tests.py b/openwisp_monitoring/db/backends/influxdb/tests/client_tests.py
similarity index 80%
rename from openwisp_monitoring/db/backends/influxdb/tests.py
rename to openwisp_monitoring/db/backends/influxdb/tests/client_tests.py
index bea86bdab..d22b8e446 100644
--- a/openwisp_monitoring/db/backends/influxdb/tests.py
+++ b/openwisp_monitoring/db/backends/influxdb/tests/client_tests.py
@@ -1,7 +1,8 @@
-from datetime import datetime, timedelta
+from datetime import date, datetime, timedelta
from unittest.mock import patch
from celery.exceptions import Retry
+from django.conf import settings
from django.core.exceptions import ValidationError
from django.test import TestCase
from django.utils.timezone import now
@@ -15,8 +16,8 @@
from openwisp_monitoring.device.utils import SHORT_RP, manage_short_retention_policy
from openwisp_monitoring.monitoring.tests import TestMonitoringMixin
-from ...exceptions import TimeseriesWriteException
-from .. import timeseries_db
+from ....exceptions import TimeseriesWriteException
+from ... import timeseries_db
Chart = load_model('monitoring', 'Chart')
Notification = load_model('openwisp_notifications', 'Notification')
@@ -46,16 +47,6 @@ def test_get_custom_query(self):
q = c.get_query(query=custom_q, fields=['SUM(*)'])
self.assertIn('SELECT SUM(*) FROM', q)
- def test_is_aggregate_bug(self):
- m = self._create_object_metric(name='summary_avg')
- c = Chart(metric=m, configuration='dummy')
- self.assertFalse(timeseries_db._is_aggregate(c.query))
-
- def test_is_aggregate_fields_function(self):
- m = self._create_object_metric(name='is_aggregate_func')
- c = Chart(metric=m, configuration='uptime')
- self.assertTrue(timeseries_db._is_aggregate(c.query))
-
def test_get_query_fields_function(self):
c = self._create_chart(test_data=None, configuration='histogram')
q = c.get_query(fields=['ssh', 'http2', 'apple-music'])
@@ -150,21 +141,6 @@ def test_object_same_key_different_fields(self):
measurement = timeseries_db.get_list_query(q)[0]
self.assertEqual(measurement['upload'], 100)
- def test_delete_metric_data(self):
- m = self._create_general_metric(name='test_metric')
- m.write(100)
- self.assertEqual(m.read()[0]['value'], 100)
- timeseries_db.delete_metric_data(key=m.key)
- self.assertEqual(m.read(), [])
- om = self._create_object_metric(name='dummy')
- om.write(50)
- m.write(100)
- self.assertEqual(m.read()[0]['value'], 100)
- self.assertEqual(om.read()[0]['value'], 50)
- timeseries_db.delete_metric_data()
- self.assertEqual(m.read(), [])
- self.assertEqual(om.read(), [])
-
def test_get_query_1d(self):
c = self._create_chart(test_data=None, configuration='uptime')
q = c.get_query(time='1d')
@@ -197,27 +173,11 @@ def test_query_set(self):
)
self.assertEqual(c.query, expected)
self.assertEqual(
- ''.join(timeseries_db.queries.default_chart_query[0:2]), c._default_query
+ ''.join(timeseries_db.default_chart_query(tags=c.metric.tags)),
+ c._default_query,
)
c.metric.object_id = None
- self.assertEqual(timeseries_db.queries.default_chart_query[0], c._default_query)
-
- def test_read_order(self):
- m = self._create_general_metric(name='dummy')
- m.write(30)
- m.write(40, time=now() - timedelta(days=2))
- with self.subTest('Test ascending read order'):
- metric_data = m.read(limit=2, order='time')
- self.assertEqual(metric_data[0]['value'], 40)
- self.assertEqual(metric_data[1]['value'], 30)
- with self.subTest('Test descending read order'):
- metric_data = m.read(limit=2, order='-time')
- self.assertEqual(metric_data[0]['value'], 30)
- self.assertEqual(metric_data[1]['value'], 40)
- with self.subTest('Test invalid read order'):
- with self.assertRaises(timeseries_db.client_error) as e:
- metric_data = m.read(limit=2, order='invalid')
- self.assertIn('Invalid order "invalid" passed.', str(e))
+ self.assertEqual(timeseries_db.default_chart_query(tags=None), c._default_query)
def test_read_with_rp(self):
self._create_admin()
@@ -293,3 +253,59 @@ def test_timeseries_write_params(self, mock_write):
def _retry_task(self, task_signature):
task_kwargs = task_signature.kwargs
task_signature.type.run(**task_kwargs)
+
+ def test_get_query(self):
+ c = self._create_chart(test_data=False)
+ m = c.metric
+ now_ = now()
+ today = date(now_.year, now_.month, now_.day)
+ time = today - timedelta(days=6)
+ expected = c.query.format(
+ field_name=m.field_name,
+ key=m.key,
+ content_type=m.content_type_key,
+ object_id=m.object_id,
+ time=str(time),
+ )
+ expected = "{0} tz('{1}')".format(expected, settings.TIME_ZONE)
+ self.assertEqual(c.get_query(), expected)
+
+ def test_read(self):
+ c = self._create_chart()
+ data = c.read()
+ key = c.metric.field_name
+ self.assertIn('x', data)
+ self.assertIn('traces', data)
+ self.assertEqual(len(data['x']), 3)
+ charts = data['traces']
+ self.assertEqual(charts[0][0], key)
+ self.assertEqual(len(charts[0][1]), 3)
+ self.assertEqual(charts[0][1], [3, 6, 9])
+
+ def test_read_multiple(self):
+ c = self._create_chart(test_data=None, configuration='multiple_test')
+ m1 = c.metric
+ m2 = self._create_object_metric(
+ name='test metric 2',
+ key='test_metric',
+ field_name='value2',
+ content_object=m1.content_object,
+ )
+ now_ = now()
+ for n in range(0, 3):
+ time = now_ - timedelta(days=n)
+ m1.write(n + 1, time=time)
+ m2.write(n + 2, time=time)
+ data = c.read()
+ f1 = m1.field_name
+ f2 = 'value2'
+ self.assertIn('x', data)
+ self.assertIn('traces', data)
+ self.assertEqual(len(data['x']), 3)
+ charts = data['traces']
+ self.assertIn(f1, charts[0][0])
+ self.assertIn(f2, charts[1][0])
+ self.assertEqual(len(charts[0][1]), 3)
+ self.assertEqual(len(charts[1][1]), 3)
+ self.assertEqual(charts[0][1], [3, 2, 1])
+ self.assertEqual(charts[1][1], [4, 3, 2])
diff --git a/openwisp_monitoring/db/tests.py b/openwisp_monitoring/db/tests.py
new file mode 100644
index 000000000..c0103c869
--- /dev/null
+++ b/openwisp_monitoring/db/tests.py
@@ -0,0 +1,55 @@
+from datetime import timedelta
+
+from django.utils.timezone import now
+from swapper import load_model
+
+from . import timeseries_db
+from .backends import load_backend_module
+
+Chart = load_model('monitoring', 'Chart')
+tests = load_backend_module(module='tests.client_tests')
+
+
+class TestDatabaseClient(tests.TestDatabaseClient):
+ def test_is_aggregate_bug(self):
+ m = self._create_object_metric(name='summary_avg')
+ c = Chart(metric=m, configuration='dummy')
+ self.assertFalse(timeseries_db._is_aggregate(c.query))
+
+ def test_is_aggregate_fields_function(self):
+ m = self._create_object_metric(name='is_aggregate_func')
+ c = Chart(metric=m, configuration='uptime')
+ self.assertTrue(timeseries_db._is_aggregate(c.query))
+
+ def test_delete_metric_data(self):
+ m = self._create_general_metric(name='test_metric')
+ m.write(100)
+ self.assertEqual(m.read()[0]['value'], 100)
+ timeseries_db.delete_metric_data(key=m.key)
+ self.assertEqual(m.read(), [])
+ om = self._create_object_metric(name='dummy')
+ om.write(50)
+ m.write(100)
+ self.assertEqual(m.read()[0]['value'], 100)
+ self.assertEqual(om.read()[0]['value'], 50)
+ timeseries_db.delete_metric_data()
+ self.assertEqual(m.read(), [])
+ self.assertEqual(om.read(), [])
+
+ def test_read_order(self):
+ timeseries_db.delete_metric_data()
+ m = self._create_general_metric(name='dummy')
+ m.write(40, time=now() - timedelta(days=2))
+ m.write(30)
+ with self.subTest('Test ascending read order'):
+ metric_data = m.read(limit=2, order='time')
+ self.assertEqual(metric_data[0]['value'], 40)
+ self.assertEqual(metric_data[1]['value'], 30)
+ with self.subTest('Test descending read order'):
+ metric_data = m.read(limit=2, order='-time')
+ self.assertEqual(metric_data[0]['value'], 30)
+ self.assertEqual(metric_data[1]['value'], 40)
+ with self.subTest('Test invalid read order'):
+ with self.assertRaises(timeseries_db.client_error) as e:
+ metric_data = m.read(limit=2, order='invalid')
+ self.assertIn('Invalid order "invalid" passed.', str(e))
diff --git a/openwisp_monitoring/device/base/models.py b/openwisp_monitoring/device/base/models.py
index 8b7ff22e9..385400064 100644
--- a/openwisp_monitoring/device/base/models.py
+++ b/openwisp_monitoring/device/base/models.py
@@ -22,7 +22,7 @@
from openwisp_utils.base import TimeStampedEditableModel
-from ...db import device_data_query, timeseries_db
+from ...db import timeseries_db
from ...monitoring.signals import threshold_crossed
from ...monitoring.tasks import timeseries_write
from .. import settings as app_settings
@@ -104,11 +104,12 @@ def data(self):
"""
if self.__data:
return self.__data
- q = device_data_query.format(SHORT_RP, self.__key, self.pk)
cache_key = get_device_cache_key(device=self, context='current-data')
points = cache.get(cache_key)
if not points:
- points = timeseries_db.get_list_query(q, precision=None)
+ points = timeseries_db._device_data(
+ rp=SHORT_RP, tags={'pk': self.pk}, key=self.__key, fields='data'
+ )
if not points:
return None
self.data_timestamp = points[0]['time']
diff --git a/openwisp_monitoring/device/tests/test_api.py b/openwisp_monitoring/device/tests/test_api.py
index b248f75b4..08e40d7c4 100644
--- a/openwisp_monitoring/device/tests/test_api.py
+++ b/openwisp_monitoring/device/tests/test_api.py
@@ -289,10 +289,10 @@ def test_get_device_metrics_csv(self):
'2',
'0.4',
'0.1',
- '2.0',
- '1.0',
+ '2',
+ '1',
'9.73',
- '0.0',
+ '0',
'8.27',
],
)
diff --git a/openwisp_monitoring/monitoring/base/models.py b/openwisp_monitoring/monitoring/base/models.py
index 6f92ff24f..abfcdd59f 100644
--- a/openwisp_monitoring/monitoring/base/models.py
+++ b/openwisp_monitoring/monitoring/base/models.py
@@ -20,7 +20,7 @@
from openwisp_utils.base import TimeStampedEditableModel
-from ...db import default_chart_query, timeseries_db
+from ...db import timeseries_db
from ..configuration import (
CHART_CONFIGURATION_CHOICES,
DEFAULT_COLORS,
@@ -338,10 +338,8 @@ def top_fields(self):
@property
def _default_query(self):
- q = default_chart_query[0]
- if self.metric.object_id:
- q += default_chart_query[1]
- return q
+ tags = True if self.metric.object_id else False
+ return timeseries_db.default_chart_query(tags)
def get_query(
self,
@@ -362,10 +360,10 @@ def get_top_fields(self, number):
Returns list of top ``number`` of fields (highest sum) of a
measurement in the specified time range (descending order).
"""
- q = self._default_query.replace('{field_name}', '{fields}')
params = self._get_query_params(self.DEFAULT_TIME)
return timeseries_db._get_top_fields(
- query=q,
+ default_query=self._default_query,
+ query=self.get_query(),
chart_type=self.type,
group_map=self.GROUP_MAP,
number=number,
@@ -410,16 +408,23 @@ def read(
try:
query_kwargs = dict(time=time, timezone=timezone)
if self.top_fields:
- fields = self.get_top_fields(self.top_fields)
- data_query = self.get_query(fields=fields, **query_kwargs)
- summary_query = self.get_query(
- fields=fields, summary=True, **query_kwargs
+ points = summary = timeseries_db._get_top_fields(
+ default_query=self._default_query,
+ chart_type=self.type,
+ group_map=self.GROUP_MAP,
+ number=self.top_fields,
+ params=self._get_query_params(self.DEFAULT_TIME),
+ time=time,
+ query=self.query,
+ get_fields=False,
)
else:
data_query = self.get_query(**query_kwargs)
summary_query = self.get_query(summary=True, **query_kwargs)
- points = timeseries_db.get_list_query(data_query)
- summary = timeseries_db.get_list_query(summary_query)
+ points = timeseries_db.get_list_query(data_query, key=self.metric.key)
+ summary = timeseries_db.get_list_query(
+ summary_query, key=self.metric.key
+ )
except timeseries_db.client_error as e:
logging.error(e, exc_info=True)
raise e
@@ -471,7 +476,11 @@ def _round(value, decimal_places):
control = 1.0 / 10 ** decimal_places
if value < control:
decimal_places += 2
- return round(value, decimal_places)
+ value = round(value, decimal_places)
+ # added for Elasticsearch division outputs (traffic metric)
+ if isinstance(value, float) and value.is_integer():
+ value = int(value)
+ return value
class AbstractAlertSettings(TimeStampedEditableModel):
diff --git a/openwisp_monitoring/monitoring/tests/__init__.py b/openwisp_monitoring/monitoring/tests/__init__.py
index ac943a8ca..9803a19c8 100644
--- a/openwisp_monitoring/monitoring/tests/__init__.py
+++ b/openwisp_monitoring/monitoring/tests/__init__.py
@@ -7,6 +7,7 @@
from ...db import timeseries_db
from ...db.backends import TIMESERIES_DB
+from ...db.backends.elasticsearch.queries import _make_query
from ..configuration import (
register_chart,
register_metric,
@@ -80,7 +81,10 @@
"SELECT {fields|SUM|/ 1} FROM {key} "
"WHERE time >= '{time}' AND content_type = "
"'{content_type}' AND object_id = '{object_id}'"
- )
+ ),
+ 'elasticsearch': _make_query(
+ {'{field_name}': {'sum': {'field': 'points.fields.{field_name}'}}}
+ ),
},
},
'dummy': {
@@ -97,7 +101,7 @@
'description': 'Bugged chart for testing purposes.',
'unit': 'bugs',
'order': 999,
- 'query': {'influxdb': "BAD"},
+ 'query': {'influxdb': "BAD", 'elasticsearch': "BAD"},
},
'default': {
'type': 'line',
@@ -109,7 +113,8 @@
'influxdb': (
"SELECT {field_name} FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}'"
- )
+ ),
+ 'elasticsearch': _make_query(),
},
},
'multiple_test': {
@@ -122,7 +127,13 @@
'influxdb': (
"SELECT {field_name}, value2 FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}'"
- )
+ ),
+ 'elasticsearch': _make_query(
+ {
+ '{field_name}': {'sum': {'field': 'points.fields.{field_name}'}},
+ 'value2': {'sum': {'field': 'points.fields.value2'}},
+ }
+ ),
},
},
'mean_test': {
@@ -135,7 +146,8 @@
'influxdb': (
"SELECT MEAN({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}'"
- )
+ ),
+ 'elasticsearch': _make_query(),
},
},
'sum_test': {
@@ -148,7 +160,10 @@
'influxdb': (
"SELECT SUM({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND "
"content_type = '{content_type}' AND object_id = '{object_id}'"
- )
+ ),
+ 'elasticsearch': _make_query(
+ {'{field_name}': {'sum': {'field': 'points.fields.{field_name}'}}}
+ ),
},
},
'top_fields_mean': {
@@ -162,7 +177,8 @@
"SELECT {fields|MEAN} FROM {key} "
"WHERE time >= '{time}' AND content_type = "
"'{content_type}' AND object_id = '{object_id}'"
- )
+ ),
+ 'elasticsearch': _make_query(),
},
},
}
diff --git a/openwisp_monitoring/monitoring/tests/test_charts.py b/openwisp_monitoring/monitoring/tests/test_charts.py
index 5adf6f33e..704e9bc36 100644
--- a/openwisp_monitoring/monitoring/tests/test_charts.py
+++ b/openwisp_monitoring/monitoring/tests/test_charts.py
@@ -2,7 +2,6 @@
from datetime import date, timedelta
from unittest.mock import patch
-from django.conf import settings
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.test import TestCase
from django.utils.timezone import now
@@ -25,18 +24,6 @@ class TestCharts(TestMonitoringMixin, TestCase):
Tests for functionalities related to charts
"""
- def test_read(self):
- c = self._create_chart()
- data = c.read()
- key = c.metric.field_name
- self.assertIn('x', data)
- self.assertIn('traces', data)
- self.assertEqual(len(data['x']), 3)
- charts = data['traces']
- self.assertEqual(charts[0][0], key)
- self.assertEqual(len(charts[0][1]), 3)
- self.assertEqual(charts[0][1], [3, 6, 9])
-
def test_read_summary_avg(self):
m = self._create_object_metric(name='summary_avg')
c = self._create_chart(metric=m, test_data=False, configuration='mean_test')
@@ -133,34 +120,6 @@ def test_read_summary_top_fields_acid(self):
self.assertEqual(data['summary'], {'google': 87500000, 'facebook': 37503000})
self.assertEqual(c.get_top_fields(2), ['google', 'facebook'])
- def test_read_multiple(self):
- c = self._create_chart(test_data=None, configuration='multiple_test')
- m1 = c.metric
- m2 = self._create_object_metric(
- name='test metric 2',
- key='test_metric',
- field_name='value2',
- content_object=m1.content_object,
- )
- now_ = now()
- for n in range(0, 3):
- time = now_ - timedelta(days=n)
- m1.write(n + 1, time=time)
- m2.write(n + 2, time=time)
- data = c.read()
- f1 = m1.field_name
- f2 = 'value2'
- self.assertIn('x', data)
- self.assertIn('traces', data)
- self.assertEqual(len(data['x']), 3)
- charts = data['traces']
- self.assertIn(f1, charts[0][0])
- self.assertIn(f2, charts[1][0])
- self.assertEqual(len(charts[0][1]), 3)
- self.assertEqual(len(charts[1][1]), 3)
- self.assertEqual(charts[0][1], [3, 2, 1])
- self.assertEqual(charts[1][1], [4, 3, 2])
-
def test_json(self):
c = self._create_chart()
data = c.read()
@@ -180,22 +139,6 @@ def test_read_bad_query(self):
else:
self.fail('ValidationError not raised')
- def test_get_query(self):
- c = self._create_chart(test_data=False)
- m = c.metric
- now_ = now()
- today = date(now_.year, now_.month, now_.day)
- time = today - timedelta(days=6)
- expected = c.query.format(
- field_name=m.field_name,
- key=m.key,
- content_type=m.content_type_key,
- object_id=m.object_id,
- time=str(time),
- )
- expected = "{0} tz('{1}')".format(expected, settings.TIME_ZONE)
- self.assertEqual(c.get_query(), expected)
-
def test_description(self):
c = self._create_chart(test_data=False)
self.assertEqual(c.description, 'Dummy chart for testing purposes.')
diff --git a/requirements.txt b/requirements.txt
index bbfb664a0..158898a6b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -5,6 +5,7 @@ openwisp-controller @ https://github.com/openwisp/openwisp-controller/tarball/ma
# is the one dictated by openwisp-controller
influxdb~=5.3
django-celery-email~=3.0.0
+mac-vendor-lookup~=0.1
django-cache-memoize~=0.1
django-nested-admin~=3.3.2
swapper~=1.1
diff --git a/setup.py b/setup.py
index 43ca4bb97..b2a629f7f 100755
--- a/setup.py
+++ b/setup.py
@@ -55,6 +55,10 @@ def get_install_requires():
include_package_data=True,
zip_safe=False,
install_requires=get_install_requires(),
+ extras_require={
+ 'elasticsearch': ['elasticsearch-dsl>=7.0.0,<8.0.0'],
+ 'influxdb': ['influxdb>=5.2,<5.3'],
+ },
classifiers=[
'Development Status :: 3 - Alpha',
'Environment :: Web Environment',
@@ -64,7 +68,7 @@ def get_install_requires():
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
'Operating System :: OS Independent',
'Framework :: Django',
- 'Programming Language :: Python :: 2.7',
- 'Programming Language :: Python :: 3.4',
+ 'Programming Language :: Python :: 3.6',
+ 'Programming Language :: Python :: 3.7',
],
)
diff --git a/tests/openwisp2/settings.py b/tests/openwisp2/settings.py
index e8ab72828..6bba45185 100644
--- a/tests/openwisp2/settings.py
+++ b/tests/openwisp2/settings.py
@@ -19,7 +19,7 @@
}
}
-TIMESERIES_DATABASE = {
+INFLUXDB_SETTINGS = {
'BACKEND': 'openwisp_monitoring.db.backends.influxdb',
'USER': 'openwisp',
'PASSWORD': 'openwisp',
@@ -28,6 +28,20 @@
'PORT': '8086',
}
+ELASTICSEARCH_SETTINGS = {
+ 'BACKEND': 'openwisp_monitoring.db.backends.elasticsearch',
+ 'USER': 'openwisp',
+ 'PASSWORD': 'openwisp',
+ 'NAME': 'openwisp2',
+ 'HOST': os.getenv('ELASTICSEARCH_HOST', 'localhost'),
+ 'PORT': '9200',
+}
+
+if os.environ.get('elasticsearch', False):
+ TIMESERIES_DATABASE = ELASTICSEARCH_SETTINGS
+else:
+ TIMESERIES_DATABASE = INFLUXDB_SETTINGS
+
SECRET_KEY = 'fn)t*+$)ugeyip6-#txyy$5wf2ervc0d2n#h)qb)y5@ly$t*@w'
INSTALLED_APPS = [