From b2553bae2def34549e17ef79e4a734858aa263dd Mon Sep 17 00:00:00 2001 From: Hardik Jain <54471024+nepython@users.noreply.github.com> Date: Tue, 23 Jun 2020 06:12:24 +0530 Subject: [PATCH] [db] Abstracted code which deals with time series DB #65 Implements and closes #65 --- README.rst | 12 +- openwisp_monitoring/db/__init__.py | 7 + openwisp_monitoring/db/backends/__init__.py | 63 ++++ .../db/backends/influxdb/__init__.py | 0 .../db/backends/influxdb/client.py | 286 ++++++++++++++++++ .../db/backends/influxdb/exception.py | 5 + .../db/backends/influxdb/queries.py | 53 ++++ .../db/backends/influxdb/tests.py | 179 +++++++++++ openwisp_monitoring/db/utils.py | 0 openwisp_monitoring/device/base/models.py | 17 +- .../device/tests/test_models.py | 11 - openwisp_monitoring/device/utils.py | 16 +- openwisp_monitoring/monitoring/apps.py | 8 +- openwisp_monitoring/monitoring/base/models.py | 231 ++++---------- openwisp_monitoring/monitoring/charts.py | 72 +---- openwisp_monitoring/monitoring/settings.py | 6 - .../monitoring/tests/__init__.py | 20 +- .../monitoring/tests/test_charts.py | 71 +---- .../monitoring/tests/test_db_creation.py | 10 +- .../monitoring/tests/test_models.py | 67 ---- openwisp_monitoring/monitoring/utils.py | 62 ---- tests/openwisp2/settings.py | 90 +++--- 22 files changed, 741 insertions(+), 545 deletions(-) create mode 100644 openwisp_monitoring/db/__init__.py create mode 100644 openwisp_monitoring/db/backends/__init__.py create mode 100644 openwisp_monitoring/db/backends/influxdb/__init__.py create mode 100644 openwisp_monitoring/db/backends/influxdb/client.py create mode 100644 openwisp_monitoring/db/backends/influxdb/exception.py create mode 100644 openwisp_monitoring/db/backends/influxdb/queries.py create mode 100644 openwisp_monitoring/db/backends/influxdb/tests.py create mode 100644 openwisp_monitoring/db/utils.py delete mode 100644 openwisp_monitoring/monitoring/utils.py diff --git a/README.rst b/README.rst index 065d19800..b4fe04b18 100644 --- a/README.rst +++ b/README.rst @@ -94,9 +94,15 @@ Follow the setup instructions of `openwisp-controller ] # Make sure you change them in production - INFLUXDB_USER = 'openwisp' - INFLUXDB_PASSWORD = 'openwisp' - INFLUXDB_DATABASE = 'openwisp2' + # You can select one of the backends located in openwisp_monitoring.db.backends + TIMESERIES_DATABASE = { + 'BACKEND': 'openwisp_monitoring.db.backends.influxdb', + 'USER': 'openwisp', + 'PASSWORD': 'openwisp', + 'NAME': 'openwisp2', + 'HOST': 'localhost', + 'PORT': '8086', + } ``urls.py``: diff --git a/openwisp_monitoring/db/__init__.py b/openwisp_monitoring/db/__init__.py new file mode 100644 index 000000000..063d2d8f7 --- /dev/null +++ b/openwisp_monitoring/db/__init__.py @@ -0,0 +1,7 @@ +from .backends import timeseries_db + +chart_query = timeseries_db.queries.chart_query +default_chart_query = timeseries_db.queries.default_chart_query +device_data_query = timeseries_db.queries.device_data_query + +__all__ = ['timeseries_db', 'chart_query', 'default_chart_query', 'device_data_query'] diff --git a/openwisp_monitoring/db/backends/__init__.py b/openwisp_monitoring/db/backends/__init__.py new file mode 100644 index 000000000..ae4847acd --- /dev/null +++ b/openwisp_monitoring/db/backends/__init__.py @@ -0,0 +1,63 @@ +import logging +from importlib import import_module + +from django.conf import settings +from django.core.exceptions import ImproperlyConfigured +from django.db import DatabaseError + +logger = logging.getLogger(__name__) + +TIMESERIES_DB = getattr(settings, 'TIMESERIES_DATABASE', None) +if not TIMESERIES_DB: + TIMESERIES_DB = { + 'BACKEND': 'openwisp_monitoring.db.backends.influxdb', + 'USER': getattr(settings, 'INFLUXDB_USER', 'openwisp'), + 'PASSWORD': getattr(settings, 'INFLUXDB_PASSWORD', 'openwisp'), + 'NAME': getattr(settings, 'INFLUXDB_DATABASE', 'openwisp2'), + 'HOST': getattr(settings, 'INFLUXDB_HOST', 'localhost'), + 'PORT': getattr(settings, 'INFLUXDB_PORT', '8086'), + } + logger.warning( + 'The previous method to define Timeseries Database has been deprecated. Please refer to the docs:\n' + 'https://github.com/openwisp/openwisp-monitoring#setup-integrate-in-an-existing-django-project', + ) + + +def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None): + """ + Returns database backend module given a fully qualified database backend name, + or raise an error if it doesn't exist or backend is not well defined. + """ + try: + assert 'BACKEND' in TIMESERIES_DB, 'BACKEND' + assert 'USER' in TIMESERIES_DB, 'USER' + assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD' + assert 'NAME' in TIMESERIES_DB, 'NAME' + assert 'HOST' in TIMESERIES_DB, 'HOST' + assert 'PORT' in TIMESERIES_DB, 'PORT' + if module: + return import_module(f'{backend_name}.{module}') + else: + return import_module(backend_name) + except AttributeError as e: + raise DatabaseError('No TIMESERIES_DATABASE specified in settings') from e + except AssertionError as e: + raise ImproperlyConfigured( + f'"{e}" field is not declared in TIMESERIES_DATABASE' + ) from e + except ImportError as e: + # The database backend wasn't found. Display a helpful error message + # listing all built-in database backends. + builtin_backends = ['influxdb'] + if backend_name not in [ + f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends + ]: + raise ImproperlyConfigured( + f"{backend_name} isn't an available database backend.\n" + "Try using 'openwisp_monitoring.db.backends.XXX', where XXX is one of:\n" + f"{builtin_backends}" + ) from e + + +timeseries_db = load_backend_module(module='client').DatabaseClient() +timeseries_db.queries = load_backend_module(module='queries') diff --git a/openwisp_monitoring/db/backends/influxdb/__init__.py b/openwisp_monitoring/db/backends/influxdb/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/openwisp_monitoring/db/backends/influxdb/client.py b/openwisp_monitoring/db/backends/influxdb/client.py new file mode 100644 index 000000000..ef9d33689 --- /dev/null +++ b/openwisp_monitoring/db/backends/influxdb/client.py @@ -0,0 +1,286 @@ +import logging +import operator +import re +from collections import OrderedDict +from datetime import datetime + +from django.conf import settings +from django.core.exceptions import ValidationError +from django.utils.functional import cached_property +from django.utils.translation import gettext_lazy as _ +from influxdb import InfluxDBClient + +from .. import TIMESERIES_DB +from .exception import DatabaseException + +logger = logging.getLogger(__name__) + + +class DatabaseClient(object): + _AGGREGATE = [ + 'COUNT', + 'DISTINCT', + 'INTEGRAL', + 'MEAN', + 'MEDIAN', + 'MODE', + 'SPREAD', + 'STDDEV', + 'SUM', + 'BOTTOM', + 'FIRST', + 'LAST', + 'MAX', + 'MIN', + 'PERCENTILE', + 'SAMPLE', + 'TOP', + 'CEILING', + 'CUMULATIVE_SUM', + 'DERIVATIVE', + 'DIFFERENCE', + 'ELAPSED', + 'FLOOR', + 'HISTOGRAM', + 'MOVING_AVERAGE', + 'NON_NEGATIVE_DERIVATIVE', + 'HOLT_WINTERS', + ] + _FORBIDDEN = ['drop', 'create', 'delete', 'alter', 'into'] + + def __init__(self, db_name=None): + self._db = None + self.db_name = db_name or TIMESERIES_DB['NAME'] + self.client_error = DatabaseException.client_error + + def create_database(self): + """ creates database if necessary """ + db = self.get_db + # InfluxDB does not create a new database, neither raise an error if database exists + db.create_database(self.db_name) + logger.debug(f'Created InfluxDB database "{self.db_name}"') + + def drop_database(self): + """ drops database if it exists """ + db = self.get_db + # InfluxDB does not raise an error if database does not exist + db.drop_database(self.db_name) + logger.debug(f'Dropped InfluxDB database "{self.db_name}"') + + @cached_property + def get_db(self): + """ Returns an ``InfluxDBClient`` instance """ + return InfluxDBClient( + TIMESERIES_DB['HOST'], + TIMESERIES_DB['PORT'], + TIMESERIES_DB['USER'], + TIMESERIES_DB['PASSWORD'], + self.db_name, + ) + + def create_or_alter_retention_policy(self, name, duration): + """ creates or alters existing retention policy if necessary """ + db = self.get_db + retention_policies = db.get_list_retention_policies() + exists = False + duration_changed = False + for policy in retention_policies: + if policy['name'] == name: + exists = True + duration_changed = policy['duration'] + break + if not exists: + db.create_retention_policy(name=name, duration=duration, replication=1) + elif exists and duration_changed: + db.alter_retention_policy(name=name, duration=duration) + + def query(self, query, precision=None, **kwargs): + db = self.get_db + database = kwargs.get('database') or self.db_name + return db.query( + query, + kwargs.get('params'), + epoch=precision, + expected_response_code=kwargs.get('expected_response_code') or 200, + database=database, + ) + + def write(self, name, values, **kwargs): + point = { + 'measurement': name, + 'tags': kwargs.get('tags'), + 'fields': values, + } + timestamp = kwargs.get('timestamp') + if isinstance(timestamp, datetime): + timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') + if timestamp: + point['time'] = timestamp + self.get_db.write( + {'points': [point]}, + { + 'db': kwargs.get('database') or self.db_name, + 'rp': kwargs.get('retention_policy'), + }, + ) + + def read(self, key, fields, tags, **kwargs): + extra_fields = kwargs.get('extra_fields') + since = kwargs.get('since') + order = kwargs.get('order') + limit = kwargs.get('limit') + if extra_fields and extra_fields != '*': + fields = ', '.join([fields] + extra_fields) + elif extra_fields == '*': + fields = '*' + q = f'SELECT {fields} FROM {key}' + conditions = [] + if since: + conditions.append(f'time >= {since}') + if tags: + conditions.append( + ' AND '.join(["{0} = '{1}'".format(*tag) for tag in tags.items()]) + ) + if conditions: + conditions = 'WHERE %s' % ' AND '.join(conditions) + q = f'{q} {conditions}' + if order: + q = f'{q} ORDER BY {order}' + if limit: + q = f'{q} LIMIT {limit}' + return list(self.query(q, precision='s').get_points()) + + def get_list_query(self, query, precision='s'): + return list(self.query(query, precision=precision).get_points()) + + def get_list_retention_policies(self): + return self.get_db.get_list_retention_policies() + + def delete_metric_data(self, key=None, tags=None): + """ + deletes a specific metric based on the key and tags + provided, you may also choose to delete all metrics + """ + if not key and not tags: + self.query('DROP SERIES FROM /.*/') + else: + self.get_db.delete_series(measurement=key, tags=tags) + + # Chart related functions below + + def validate_query(self, query): + for word in self._FORBIDDEN: + if word in query.lower(): + msg = _(f'the word "{word.upper()}" is not allowed') + raise ValidationError({'configuration': msg}) + return self._is_aggregate(query) + + def _is_aggregate(self, q): + q = q.upper() + for word in self._AGGREGATE: + if any(['%s(' % word in q, '|%s}' % word in q, '|%s|' % word in q]): + return True + return False + + def get_query( + self, + chart_type, + params, + time, + group_map, + summary=False, + fields=None, + query=None, + timezone=settings.TIME_ZONE, + ): + query = self._fields(fields, query, params['field_name']) + query = query.format(**params) + query = self._group_by(query, time, chart_type, group_map, strip=summary) + if summary: + query = f'{query} LIMIT 1' + return f"{query} tz('{timezone}')" + + _group_by_regex = re.compile(r'GROUP BY time\(\w+\)', flags=re.IGNORECASE) + + def _group_by(self, query, time, chart_type, group_map, strip=False): + if not self.validate_query(query): + return query + if not strip and not chart_type == 'histogram': + value = group_map[time] + group_by = f'GROUP BY time({value})' + else: + # can be empty when getting summaries + group_by = '' + if 'GROUP BY' not in query.upper(): + query = f'{query} {group_by}' + else: + query = re.sub(self._group_by_regex, group_by, query) + return query + + _fields_regex = re.compile( + r'(?P\{fields\|(?P\w+)(?:\|(?P.*?))?\})', flags=re.IGNORECASE + ) + + def _fields(self, fields, query, field_name): + """ + support substitution of {fields||} + with (field1) AS field1 , + (field2) AS field2 + """ + matches = re.search(self._fields_regex, query) + if not matches and not fields: + return query + elif matches and not fields: + groups = matches.groupdict() + fields_key = groups.get('group') + fields = [field_name] + if fields and matches: + groups = matches.groupdict() + function = groups['func'] # required + operation = groups.get('op') # optional + fields = [self.__transform_field(f, function, operation) for f in fields] + fields_key = groups.get('group') + else: + fields_key = '{fields}' + if fields: + selected_fields = ', '.join(fields) + return query.replace(fields_key, selected_fields) + + def __transform_field(self, field, function, operation=None): + if operation: + operation = f' {operation}' + else: + operation = '' + return f'{function}("{field}"){operation} AS {field.replace("-", "_")}' + + def _get_top_fields( + self, + query, + params, + chart_type, + group_map, + number, + time, + timezone=settings.TIME_ZONE, + ): + q = self.get_query( + query=query, + params=params, + chart_type=chart_type, + group_map=group_map, + summary=True, + fields=['SUM(*)'], + time=time, + timezone=timezone, + ) + res = list(self.query(q, precision='s').get_points()) + if not res: + return [] + res = res[0] + res = {key: value for key, value in res.items() if value is not None} + sorted_dict = OrderedDict(sorted(res.items(), key=operator.itemgetter(1))) + del sorted_dict['time'] + keys = list(sorted_dict.keys()) + keys.reverse() + top = keys[0:number] + return [item.replace('sum_', '') for item in top] diff --git a/openwisp_monitoring/db/backends/influxdb/exception.py b/openwisp_monitoring/db/backends/influxdb/exception.py new file mode 100644 index 000000000..479a41c40 --- /dev/null +++ b/openwisp_monitoring/db/backends/influxdb/exception.py @@ -0,0 +1,5 @@ +from influxdb.exceptions import InfluxDBClientError + + +class DatabaseException(object): + client_error = InfluxDBClientError diff --git a/openwisp_monitoring/db/backends/influxdb/queries.py b/openwisp_monitoring/db/backends/influxdb/queries.py new file mode 100644 index 000000000..92ea9e496 --- /dev/null +++ b/openwisp_monitoring/db/backends/influxdb/queries.py @@ -0,0 +1,53 @@ +chart_query = { + 'uptime': ( + "SELECT MEAN({field_name})*100 AS uptime FROM {key} WHERE " + "time >= '{time}' AND content_type = '{content_type}' AND " + "object_id = '{object_id}' GROUP BY time(1d)" + ), + 'packet_loss': ( + "SELECT MEAN(loss) AS packet_loss FROM {key} WHERE " + "time >= '{time}' AND content_type = '{content_type}' AND " + "object_id = '{object_id}' GROUP BY time(1d)" + ), + 'rtt': ( + "SELECT MEAN(rtt_avg) AS RTT_average, MEAN(rtt_max) AS " + "RTT_max, MEAN(rtt_min) AS RTT_min FROM {key} WHERE " + "time >= '{time}' AND content_type = '{content_type}' AND " + "object_id = '{object_id}' GROUP BY time(1d)" + ), + 'wifi_clients': ( + "SELECT COUNT(DISTINCT({field_name})) AS wifi_clients FROM {key} " + "WHERE time >= '{time}' AND content_type = '{content_type}' " + "AND object_id = '{object_id}' GROUP BY time(1d)" + ), + 'traffic': ( + "SELECT SUM(tx_bytes) / 1000000000 AS upload, " + "SUM(rx_bytes) / 1000000000 AS download FROM {key} " + "WHERE time >= '{time}' AND content_type = '{content_type}' " + "AND object_id = '{object_id}' GROUP BY time(1d)" + ), + 'memory': ( + "SELECT 100 * MEAN(percent_used) AS memory_usage " + "FROM {key} WHERE time >= '{time}' AND content_type = '{content_type}' " + "AND object_id = '{object_id}' GROUP BY time(1d)" + ), + 'cpu': ( + "SELECT 100 * MEAN(cpu_usage) AS CPU_load FROM {key} WHERE " + "time >= '{time}' AND content_type = '{content_type}' AND " + "object_id = '{object_id}' GROUP BY time(1d)" + ), + 'disk': ( + "SELECT 100 * MEAN(used_disk) AS disk_usage FROM {key} WHERE " + "time >= '{time}' AND content_type = '{content_type}' AND " + "object_id = '{object_id}' GROUP BY time(1d)" + ), +} + +default_chart_query = [ + "SELECT {field_name} FROM {key} WHERE time >= '{time}'", + " AND content_type = '{content_type}' AND object_id = '{object_id}'", +] + +device_data_query = ( + "SELECT data FROM {0}.{1} WHERE pk = '{2}' " "ORDER BY time DESC LIMIT 1" +) diff --git a/openwisp_monitoring/db/backends/influxdb/tests.py b/openwisp_monitoring/db/backends/influxdb/tests.py new file mode 100644 index 000000000..f07937475 --- /dev/null +++ b/openwisp_monitoring/db/backends/influxdb/tests.py @@ -0,0 +1,179 @@ +from datetime import timedelta + +from django.core.exceptions import ValidationError +from django.test import TestCase +from django.utils.timezone import now +from openwisp_monitoring.device.settings import SHORT_RETENTION_POLICY +from openwisp_monitoring.device.utils import SHORT_RP, manage_short_retention_policy +from openwisp_monitoring.monitoring.tests import TestMonitoringMixin +from swapper import load_model + +from .. import timeseries_db + +Chart = load_model('monitoring', 'Chart') + + +class TestDatabaseClient(TestMonitoringMixin, TestCase): + def test_forbidden_queries(self): + queries = [ + 'DROP DATABASE openwisp2', + 'DROP MEASUREMENT test_metric', + 'CREATE DATABASE test', + 'DELETE MEASUREMENT test_metric', + 'ALTER RETENTION POLICY policy', + 'SELECT * INTO metric2 FROM test_metric', + ] + for q in queries: + try: + timeseries_db.validate_query(q) + except ValidationError as e: + self.assertIn('configuration', e.message_dict) + else: + self.fail('ValidationError not raised') + + def test_get_custom_query(self): + c = self._create_chart(test_data=None) + custom_q = c._default_query.replace('{field_name}', '{fields}') + q = c.get_query(query=custom_q, fields=['SUM(*)']) + self.assertIn('SELECT SUM(*) FROM', q) + + def test_is_aggregate_bug(self): + m = self._create_object_metric(name='summary_avg') + c = Chart(metric=m, configuration='dummy') + self.assertFalse(timeseries_db._is_aggregate(c.query)) + + def test_is_aggregate_fields_function(self): + m = self._create_object_metric(name='is_aggregate_func') + c = Chart(metric=m, configuration='uptime') + self.assertTrue(timeseries_db._is_aggregate(c.query)) + + def test_get_query_fields_function(self): + c = self._create_chart(test_data=None, configuration='histogram') + q = c.get_query(fields=['ssh', 'http2', 'apple-music']) + expected = ( + 'SELECT SUM("ssh") / 1 AS ssh, ' + 'SUM("http2") / 1 AS http2, ' + 'SUM("apple-music") / 1 AS apple_music FROM' + ) + self.assertIn(expected, q) + + def test_default_query(self): + c = self._create_chart(test_data=False) + q = ( + "SELECT {field_name} FROM {key} WHERE time >= '{time}' AND " + "content_type = '{content_type}' AND object_id = '{object_id}'" + ) + self.assertEqual(c.query, q) + + def test_write(self): + timeseries_db.write('test_write', dict(value=2), database=self.TEST_DB) + measurement = list( + timeseries_db.query( + 'select * from test_write', database=self.TEST_DB + ).get_points() + )[0] + self.assertEqual(measurement['value'], 2) + + def test_general_write(self): + m = self._create_general_metric(name='Sync test') + m.write(1) + measurement = list(timeseries_db.query('select * from sync_test').get_points())[ + 0 + ] + self.assertEqual(measurement['value'], 1) + + def test_object_write(self): + om = self._create_object_metric() + om.write(3) + content_type = '.'.join(om.content_type.natural_key()) + q = ( + f"select * from test_metric WHERE object_id = '{om.object_id}'" + f" AND content_type = '{content_type}'" + ) + measurement = timeseries_db.get_list_query(q)[0] + self.assertEqual(measurement['value'], 3) + + def test_general_same_key_different_fields(self): + down = self._create_general_metric( + name='traffic (download)', key='traffic', field_name='download' + ) + down.write(200) + up = self._create_general_metric( + name='traffic (upload)', key='traffic', field_name='upload' + ) + up.write(100) + measurement = list( + timeseries_db.query('select download from traffic').get_points() + )[0] + self.assertEqual(measurement['download'], 200) + measurement = list( + timeseries_db.query('select upload from traffic').get_points() + )[0] + self.assertEqual(measurement['upload'], 100) + + def test_object_same_key_different_fields(self): + user = self._create_user() + user_down = self._create_object_metric( + name='traffic (download)', + key='traffic', + field_name='download', + content_object=user, + ) + user_down.write(200) + user_up = self._create_object_metric( + name='traffic (upload)', + key='traffic', + field_name='upload', + content_object=user, + ) + user_up.write(100) + content_type = '.'.join(user_down.content_type.natural_key()) + q = ( + f"select download from traffic WHERE object_id = '{user_down.object_id}'" + f" AND content_type = '{content_type}'" + ) + measurement = timeseries_db.get_list_query(q)[0] + self.assertEqual(measurement['download'], 200) + q = ( + f"select upload from traffic WHERE object_id = '{user_up.object_id}'" + f" AND content_type = '{content_type}'" + ) + measurement = timeseries_db.get_list_query(q)[0] + self.assertEqual(measurement['upload'], 100) + + def test_delete_metric_data(self): + m = self._create_general_metric(name='test_metric') + m.write(100) + self.assertEqual(m.read()[0]['value'], 100) + timeseries_db.delete_metric_data(key=m.key) + self.assertEqual(m.read(), []) + om = self._create_object_metric(name='dummy') + om.write(50) + m.write(100) + self.assertEqual(m.read()[0]['value'], 100) + self.assertEqual(om.read()[0]['value'], 50) + timeseries_db.delete_metric_data() + self.assertEqual(m.read(), []) + self.assertEqual(om.read(), []) + + def test_get_query_1d(self): + c = self._create_chart(test_data=None, configuration='uptime') + q = c.get_query(time='1d') + last24 = now() - timedelta(days=1) + self.assertIn(str(last24)[0:14], q) + self.assertIn('group by time(10m)', q.lower()) + + def test_get_query_30d(self): + c = self._create_chart(test_data=None, configuration='uptime') + q = c.get_query(time='30d') + last30d = now() - timedelta(days=30) + self.assertIn(str(last30d)[0:10], q) + self.assertIn('group by time(24h)', q.lower()) + + def test_retention_policy(self): + manage_short_retention_policy() + rp = timeseries_db.get_list_retention_policies() + self.assertEqual(len(rp), 2) + self.assertEqual(rp[1]['name'], SHORT_RP) + self.assertEqual(rp[1]['default'], False) + self.assertEqual(rp[1]['duration'], SHORT_RETENTION_POLICY) diff --git a/openwisp_monitoring/db/utils.py b/openwisp_monitoring/db/utils.py new file mode 100644 index 000000000..e69de29bb diff --git a/openwisp_monitoring/device/base/models.py b/openwisp_monitoring/device/base/models.py index a430ae6d8..c8694091f 100644 --- a/openwisp_monitoring/device/base/models.py +++ b/openwisp_monitoring/device/base/models.py @@ -19,8 +19,8 @@ from openwisp_utils.base import TimeStampedEditableModel +from ...db import device_data_query, timeseries_db from ...monitoring.signals import threshold_crossed -from ...monitoring.utils import query, write from .. import settings as app_settings from ..schema import schema from ..signals import health_status_changed @@ -91,15 +91,12 @@ def data_user_friendly(self): @property def data(self): """ - retrieves last data snapshot from influxdb + retrieves last data snapshot from Timeseries Database """ if self.__data: return self.__data - q = ( - "SELECT data FROM {0}.{1} WHERE pk = '{2}' " - "ORDER BY time DESC LIMIT 1".format(SHORT_RP, self.__key, self.pk) - ) - points = list(query(q).get_points()) + q = device_data_query.format(SHORT_RP, self.__key, self.pk) + points = timeseries_db.get_list_query(q, precision=None) if not points: return None self.data_timestamp = points[0]['time'] @@ -164,12 +161,14 @@ def _mac_lookup(self, value): def save_data(self, time=None): """ - validates and saves data to influxdb + validates and saves data to Timeseries Database """ self.validate_data() if app_settings.MAC_VENDOR_DETECTION: self.add_mac_vendor_info() - write( + # TODO: Rename the parameters, since they might be called + # differently in the other database (eg: tags/labels) + timeseries_db.write( name=self.__key, values={'data': self.json()}, tags={'pk': self.pk}, diff --git a/openwisp_monitoring/device/tests/test_models.py b/openwisp_monitoring/device/tests/test_models.py index 0f1345315..c672dbcef 100644 --- a/openwisp_monitoring/device/tests/test_models.py +++ b/openwisp_monitoring/device/tests/test_models.py @@ -9,10 +9,7 @@ from openwisp_controller.connection.models import Credentials, DeviceConnection from openwisp_utils.tests import catch_signal -from ...monitoring.utils import get_db -from .. import settings as app_settings from ..signals import health_status_changed -from ..utils import SHORT_RP from . import DeviceMonitoringTestCase Check = load_model('check', 'Check') @@ -305,14 +302,6 @@ def test_init(self): dd = DeviceData(data=self._sample_data) self.assertEqual(dd.data, self._sample_data) - def test_retention_policy(self): - rp = get_db().get_list_retention_policies() - self.assertEqual(len(rp), 2) - self.assertEqual(rp[1]['name'], SHORT_RP) - self.assertEqual(rp[1]['default'], False) - duration = app_settings.SHORT_RETENTION_POLICY - self.assertEqual(rp[1]['duration'], duration) - def test_device_deleted(self): d = self._create_device() metric = self._create_object_metric(name='test', content_object=d,) diff --git a/openwisp_monitoring/device/utils.py b/openwisp_monitoring/device/utils.py index be63538e4..9aff3bcd4 100644 --- a/openwisp_monitoring/device/utils.py +++ b/openwisp_monitoring/device/utils.py @@ -1,4 +1,4 @@ -from ..monitoring.utils import get_db +from ..db import timeseries_db from . import settings as app_settings SHORT_RP = 'short' @@ -12,17 +12,5 @@ def manage_short_retention_policy(): """ creates or updates the "short" retention policy """ - db = get_db() duration = app_settings.SHORT_RETENTION_POLICY - retention_policies = db.get_list_retention_policies() - exists = False - duration_changed = False - for policy in retention_policies: - if policy['name'] == SHORT_RP: - exists = True - duration_changed = policy['duration'] - break - if not exists: - db.create_retention_policy(name=SHORT_RP, duration=duration, replication=1) - elif exists and duration_changed: - db.alter_retention_policy(name=SHORT_RP, duration=duration) + timeseries_db.create_or_alter_retention_policy(SHORT_RP, duration) diff --git a/openwisp_monitoring/monitoring/apps.py b/openwisp_monitoring/monitoring/apps.py index 4814299a2..221b11e2d 100644 --- a/openwisp_monitoring/monitoring/apps.py +++ b/openwisp_monitoring/monitoring/apps.py @@ -6,7 +6,7 @@ from openwisp_notifications.types import register_notification_type from requests.exceptions import ConnectionError -from .utils import create_database +from ..db import timeseries_db class MonitoringConfig(AppConfig): @@ -22,10 +22,10 @@ def ready(self): self.register_notification_types() def create_database(self): - # create influxdb database if doesn't exist yet + # create Timeseries database if it doesn't exist yet for attempt_number in range(1, self.max_retries + 1): try: - create_database() + timeseries_db.create_database() return except ConnectionError as e: self.warn_and_delay(attempt_number) @@ -34,7 +34,7 @@ def create_database(self): def warn_and_delay(self, attempt_number): print( - 'Got error while connecting to timeseries DB. ' + 'Got error while connecting to timeseries database. ' f'Retrying again in 3 seconds (attempt n. {attempt_number} out of 5).' ) sleep(self.retry_delay) diff --git a/openwisp_monitoring/monitoring/base/models.py b/openwisp_monitoring/monitoring/base/models.py index 688068df2..8c6134bb7 100644 --- a/openwisp_monitoring/monitoring/base/models.py +++ b/openwisp_monitoring/monitoring/base/models.py @@ -1,8 +1,5 @@ import json import logging -import operator -import re -from collections import OrderedDict from datetime import date, datetime, timedelta from django.conf import settings @@ -16,13 +13,13 @@ from django.utils.text import slugify from django.utils.timezone import make_aware from django.utils.translation import gettext_lazy as _ -from influxdb.exceptions import InfluxDBClientError from openwisp_notifications.signals import notify from pytz import timezone as tz from swapper import get_model_name from openwisp_utils.base import TimeStampedEditableModel +from ...db import default_chart_query, timeseries_db from ..charts import ( DEFAULT_COLORS, get_chart_configuration, @@ -30,7 +27,6 @@ ) from ..exceptions import InvalidChartConfigException from ..signals import post_metric_write, pre_metric_write, threshold_crossed -from ..utils import query, write User = get_user_model() logger = logging.getLogger(__name__) @@ -97,9 +93,10 @@ def codename(self): """ identifier stored in timeseries db """ return self._makekey(self.name) + # TODO: This method needs to be refactored when adding the other db @staticmethod def _makekey(value): - """ makes value suited for influxdb key """ + """ makes value suited for InfluxDB key """ value = value.replace('.', '_') return slugify(value).replace('-', '_') @@ -167,7 +164,7 @@ def write(self, value, time=None, database=None, check=True, extra_values=None): values.update(extra_values) signal_kwargs = dict(sender=self.__class__, metric=self, values=values) pre_metric_write.send(**signal_kwargs) - write( + timeseries_db.write( name=self.key, values=values, tags=self.tags, @@ -181,30 +178,11 @@ def write(self, value, time=None, database=None, check=True, extra_values=None): return self.check_threshold(value, time) - def read(self, since=None, limit=1, order=None, extra_fields=None): + def read(self, **kwargs): """ reads timeseries data """ - fields = self.field_name - if extra_fields and extra_fields != '*': - fields = ', '.join([fields] + extra_fields) - elif extra_fields == '*': - fields = '*' - q = 'SELECT {fields} FROM {key}'.format(fields=fields, key=self.key) - tags = self.tags - conditions = [] - if since: - conditions.append("time >= {0}".format(since)) - if tags: - conditions.append( - ' AND '.join(["{0} = '{1}'".format(*tag) for tag in tags.items()]) - ) - if conditions: - conditions = 'WHERE %s' % ' AND '.join(conditions) - q = '{0} {1}'.format(q, conditions) - if order: - q = '{0} ORDER BY {1}'.format(q, order) - if limit: - q = '{0} LIMIT {1}'.format(q, limit) - return list(query(q, epoch='s').get_points()) + return timeseries_db.read( + key=self.key, fields=self.field_name, tags=self.tags, **kwargs + ) def _notify_users(self, notification_type, alert_settings): """ creates notifications for users """ @@ -222,46 +200,6 @@ class AbstractChart(TimeStampedEditableModel): configuration = models.CharField( max_length=16, null=True, choices=get_chart_configuration_choices() ) - - class Meta: - abstract = True - - def __str__(self): - return str(self.label) or self.metric.name - - def clean(self): - self._clean_query() - - _FORBIDDEN = ['drop', 'create', 'delete', 'alter', 'into'] - _AGGREGATE = [ - 'COUNT', - 'DISTINCT', - 'INTEGRAL', - 'MEAN', - 'MEDIAN', - 'MODE', - 'SPREAD', - 'STDDEV', - 'SUM', - 'BOTTOM', - 'FIRST', - 'LAST', - 'MAX', - 'MIN', - 'PERCENTILE', - 'SAMPLE', - 'TOP', - 'CEILING', - 'CUMULATIVE_SUM', - 'DERIVATIVE', - 'DIFFERENCE', - 'ELAPSED', - 'FLOOR', - 'HISTOGRAM', - 'MOVING_AVERAGE', - 'NON_NEGATIVE_DERIVATIVE', - 'HOLT_WINTERS', - ] GROUP_MAP = { '1d': '10m', '3d': '20m', @@ -271,18 +209,20 @@ def clean(self): } DEFAULT_TIME = '7d' - @classmethod - def _is_query_allowed(cls, query): - for word in cls._FORBIDDEN: - if word in query.lower(): - msg = _('the word "{0}" is not allowed').format(word.upper()) - raise ValidationError({'configuration': msg}) + class Meta: + abstract = True + + def __str__(self): + return str(self.label) or self.metric.name + + def clean(self): + self._clean_query() def _clean_query(self): try: - self._is_query_allowed(self.query) - query(self.get_query()) - except InfluxDBClientError as e: + timeseries_db.validate_query(self.query) + timeseries_db.query(self.get_query()) + except timeseries_db.client_error as e: raise ValidationError({'configuration': e}) from e except InvalidChartConfigException as e: raise ValidationError({'configuration': str(e)}) from e @@ -338,10 +278,7 @@ def unit(self): @property def query(self): - query = self.config_dict['query'] - if query: - return query['influxdb'] - return self._default_query + return self.config_dict['query'] or self._default_query @property def top_fields(self): @@ -349,15 +286,11 @@ def top_fields(self): @property def _default_query(self): - q = "SELECT {field_name} FROM {key} WHERE time >= '{time}'" + q = default_chart_query[0] if self.metric.object_id: - q += " AND content_type = '{content_type}' AND object_id = '{object_id}'" + q += default_chart_query[1] return q - _fields_regex = re.compile( - r'(?P\{fields\|(?P\w+)(?:\|(?P.*?))?\})', flags=re.IGNORECASE - ) - def get_query( self, time=DEFAULT_TIME, @@ -366,53 +299,36 @@ def get_query( query=None, timezone=settings.TIME_ZONE, ): - m = self.metric query = query or self.query + params = self._get_query_params(time) + return timeseries_db.get_query( + self.type, params, time, self.GROUP_MAP, summary, fields, query, timezone + ) + + def get_top_fields(self, number): + """ + Returns list of top ``number`` of fields (highest sum) of a + measurement in the specified time range (descending order). + """ + q = self._default_query.replace('{field_name}', '{fields}') + params = self._get_query_params(self.DEFAULT_TIME) + return timeseries_db._get_top_fields( + query=q, + chart_type=self.type, + group_map=self.GROUP_MAP, + number=number, + params=params, + time=self.DEFAULT_TIME, + ) + + def _get_query_params(self, time): + m = self.metric params = dict(field_name=m.field_name, key=m.key, time=self._get_time(time)) if m.object_id: params.update( {'content_type': m.content_type_key, 'object_id': m.object_id} ) - query = self._fields(fields, query) - query = query.format(**params) - query = self._group_by(query, time, strip=summary) - if summary: - query = '{0} LIMIT 1'.format(query) - return "{0} tz('{1}')".format(query, timezone) - - def _fields(self, fields, query): - """ - support substitution of {fields||} - with (field1) AS field1 , - (field2) AS field2 - """ - matches = re.search(self._fields_regex, query) - if not matches and not fields: - return query - elif matches and not fields: - groups = matches.groupdict() - fields_key = groups.get('group') - fields = [self.metric.field_name] - if fields and matches: - groups = matches.groupdict() - function = groups['func'] # required - operation = groups.get('op') # optional - fields = [self.__transform_field(f, function, operation) for f in fields] - fields_key = groups.get('group') - else: - fields_key = '{fields}' - if fields: - selected_fields = ', '.join(fields) - return query.replace(fields_key, selected_fields) - - def __transform_field(self, field, function, operation=None): - if operation: - operation = ' {}'.format(operation) - else: - operation = '' - return '{0}("{1}"){3} AS {2}'.format( - function, field, field.replace('-', '_'), operation - ) + return params def _get_time(self, time): if not isinstance(time, str): @@ -429,51 +345,6 @@ def _get_time(self, time): time = str(now - timedelta(days=days))[0:19] return time - def _is_aggregate(self, q): - q = q.upper() - for word in self._AGGREGATE: - if any(['%s(' % word in q, '|%s}' % word in q, '|%s|' % word in q]): - return True - return False - - _group_by_regex = re.compile(r'GROUP BY time\(\w+\)', flags=re.IGNORECASE) - - def _group_by(self, query, time, strip=False): - if not self._is_aggregate(query): - return query - if not strip and not self.type == 'histogram': - value = self.GROUP_MAP[time] - group_by = 'GROUP BY time({0})'.format(value) - else: - # can be empty when getting summaries - group_by = '' - if 'GROUP BY' not in query.upper(): - query = '{0} {1}'.format(query, group_by) - else: - query = re.sub(self._group_by_regex, group_by, query) - return query - - def _get_top_fields(self, number, time=DEFAULT_TIME, timezone=settings.TIME_ZONE): - """ - Returns list of top ``number`` of fields (highes sum) of a - measurement in the specified time range (descending order). - """ - q = self._default_query.replace('{field_name}', '{fields}') - q = self.get_query( - query=q, summary=True, fields=['SUM(*)'], time=time, timezone=timezone - ) - res = list(query(q, epoch='s').get_points()) - if not res: - return [] - res = res[0] - res = {key: value for key, value in res.items() if value is not None} - sorted_dict = OrderedDict(sorted(res.items(), key=operator.itemgetter(1))) - del sorted_dict['time'] - keys = list(sorted_dict.keys()) - keys.reverse() - top = keys[0:number] - return [item.replace('sum_', '') for item in top] - def read( self, decimal_places=2, @@ -487,7 +358,7 @@ def read( try: query_kwargs = dict(time=time, timezone=timezone) if self.top_fields: - fields = self._get_top_fields(self.top_fields) + fields = self.get_top_fields(self.top_fields) data_query = self.get_query(fields=fields, **query_kwargs) summary_query = self.get_query( fields=fields, summary=True, **query_kwargs @@ -495,9 +366,9 @@ def read( else: data_query = self.get_query(**query_kwargs) summary_query = self.get_query(summary=True, **query_kwargs) - points = list(query(data_query, epoch='s').get_points()) - summary = list(query(summary_query, epoch='s').get_points()) - except InfluxDBClientError as e: + points = timeseries_db.get_list_query(data_query) + summary = timeseries_db.get_list_query(summary_query) + except timeseries_db.client_error as e: logging.error(e, exc_info=True) raise e for point in points: @@ -524,7 +395,7 @@ def read( for key, value in summary[0].items(): if key == 'time': continue - if not self._is_aggregate(self.query): + if not timeseries_db.validate_query(self.query): value = None elif value: value = self._round(value, decimal_places) diff --git a/openwisp_monitoring/monitoring/charts.py b/openwisp_monitoring/monitoring/charts.py index f12f7eb73..e51a7d316 100644 --- a/openwisp_monitoring/monitoring/charts.py +++ b/openwisp_monitoring/monitoring/charts.py @@ -1,4 +1,5 @@ from django.utils.translation import gettext_lazy as _ +from openwisp_monitoring.db import chart_query from . import settings as app_settings @@ -44,13 +45,7 @@ ], 'fixed_value': 100, }, - 'query': { - 'influxdb': ( - "SELECT MEAN({field_name})*100 AS uptime FROM {key} WHERE " - "time >= '{time}' AND content_type = '{content_type}' AND " - "object_id = '{object_id}' GROUP BY time(1d)" - ) - }, + 'query': chart_query['uptime'], }, 'packet_loss': { 'type': 'bar', @@ -63,13 +58,7 @@ 'unit': '%', 'colors': [DEFAULT_COLORS[3]], 'order': 210, - 'query': { - 'influxdb': ( - "SELECT MEAN(loss) AS packet_loss FROM {key} WHERE " - "time >= '{time}' AND content_type = '{content_type}' AND " - "object_id = '{object_id}' GROUP BY time(1d)" - ) - }, + 'query': chart_query['packet_loss'], }, 'rtt': { 'type': 'scatter', @@ -84,14 +73,7 @@ ], 'unit': f' {_("ms")}', 'order': 220, - 'query': { - 'influxdb': ( - "SELECT MEAN(rtt_avg) AS RTT_average, MEAN(rtt_max) AS " - "RTT_max, MEAN(rtt_min) AS RTT_min FROM {key} WHERE " - "time >= '{time}' AND content_type = '{content_type}' AND " - "object_id = '{object_id}' GROUP BY time(1d)" - ) - }, + 'query': chart_query['rtt'], }, 'wifi_clients': { 'type': 'bar', @@ -103,13 +85,7 @@ 'summary_labels': [_('Total Unique WiFi clients')], 'unit': '', 'order': 230, - 'query': { - 'influxdb': ( - "SELECT COUNT(DISTINCT({field_name})) AS wifi_clients FROM {key} " - "WHERE time >= '{time}' AND content_type = '{content_type}' " - "AND object_id = '{object_id}' GROUP BY time(1d)" - ) - }, + 'query': chart_query['wifi_clients'], }, 'traffic': { 'type': 'scatter', @@ -122,14 +98,7 @@ 'summary_labels': [_('Total download traffic'), _('Total upload traffic')], 'unit': f' {_("GB")}', 'order': 240, - 'query': { - 'influxdb': ( - "SELECT SUM(tx_bytes) / 1000000000 AS upload, " - "SUM(rx_bytes) / 1000000000 AS download FROM {key} " - "WHERE time >= '{time}' AND content_type = '{content_type}' " - "AND object_id = '{object_id}' GROUP BY time(1d)" - ) - }, + 'query': chart_query['traffic'], }, 'memory': { 'type': 'scatter', @@ -139,13 +108,7 @@ 'unit': '%', 'colors': [DEFAULT_COLORS[4]], 'order': 250, - 'query': { - 'influxdb': ( - "SELECT 100 * MEAN(percent_used) AS memory_usage " - "FROM {key} WHERE time >= '{time}' AND content_type = '{content_type}' " - "AND object_id = '{object_id}' GROUP BY time(1d)" - ) - }, + 'query': chart_query['memory'], }, 'cpu': { 'type': 'scatter', @@ -158,13 +121,7 @@ 'unit': '%', 'colors': [DEFAULT_COLORS[-3]], 'order': 260, - 'query': { - 'influxdb': ( - "SELECT 100 * MEAN(cpu_usage) AS CPU_load FROM {key} WHERE " - "time >= '{time}' AND content_type = '{content_type}' AND " - "object_id = '{object_id}' GROUP BY time(1d)" - ) - }, + 'query': chart_query['cpu'], }, 'disk': { 'type': 'scatter', @@ -176,13 +133,7 @@ 'unit': '%', 'colors': [DEFAULT_COLORS[-1]], 'order': 270, - 'query': { - 'influxdb': ( - "SELECT 100 * MEAN(used_disk) AS disk_usage FROM {key} WHERE " - "time >= '{time}' AND content_type = '{content_type}' AND " - "object_id = '{object_id}' GROUP BY time(1d)" - ) - }, + 'query': chart_query['disk'], }, } @@ -207,10 +158,7 @@ def get_chart_configuration(): assert 'description' in options assert 'order' in options assert 'query' in options - if options['query'] is not None: - assert isinstance(options['query'], dict) - assert 'influxdb' in options['query'] - else: + if options['query'] is None: assert 'unit' in options if 'colorscale' in options: assert 'max' in options['colorscale'] diff --git a/openwisp_monitoring/monitoring/settings.py b/openwisp_monitoring/monitoring/settings.py index 24c577cc4..2fe157d05 100644 --- a/openwisp_monitoring/monitoring/settings.py +++ b/openwisp_monitoring/monitoring/settings.py @@ -1,9 +1,3 @@ from django.conf import settings -INFLUXDB_HOST = getattr(settings, 'INFLUXDB_HOST', 'localhost') -INFLUXDB_PORT = getattr(settings, 'INFLUXDB_PORT', '8086') -INFLUXDB_USER = getattr(settings, 'INFLUXDB_USER') -INFLUXDB_PASSWORD = getattr(settings, 'INFLUXDB_PASSWORD') -INFLUXDB_DATABASE = getattr(settings, 'INFLUXDB_DATABASE', 'openwisp2') - ADDITIONAL_CHARTS = getattr(settings, 'OPENWISP_MONITORING_CHARTS', {}) diff --git a/openwisp_monitoring/monitoring/tests/__init__.py b/openwisp_monitoring/monitoring/tests/__init__.py index 35068dd8d..3741a442d 100644 --- a/openwisp_monitoring/monitoring/tests/__init__.py +++ b/openwisp_monitoring/monitoring/tests/__init__.py @@ -5,8 +5,8 @@ from openwisp_users.tests.utils import TestOrganizationMixin -from .. import settings as app_settings -from ..utils import create_database, get_db, query +from ...db import timeseries_db +from ...db.backends import TIMESERIES_DB start_time = now() ten_minutes_ago = start_time - timedelta(minutes=10) @@ -16,21 +16,23 @@ class TestMonitoringMixin(TestOrganizationMixin): - ORIGINAL_DB = app_settings.INFLUXDB_DATABASE - TEST_DB = '{0}_test'.format(ORIGINAL_DB) + ORIGINAL_DB = TIMESERIES_DB['NAME'] + TEST_DB = f'{ORIGINAL_DB}_test' @classmethod def setUpClass(cls): - setattr(app_settings, 'INFLUXDB_DATABASE', cls.TEST_DB) - create_database() + # By default timeseries_db.get_db shall connect to the database + # defined in settings when apps are loaded. We don't want that while testing + timeseries_db.db_name = cls.TEST_DB + del timeseries_db.get_db + timeseries_db.create_database() @classmethod def tearDownClass(cls): - get_db().drop_database(cls.TEST_DB) - setattr(app_settings, 'INFLUXDB_DATABASE', cls.ORIGINAL_DB) + timeseries_db.drop_database() def tearDown(self): - query('DROP SERIES FROM /.*/') + timeseries_db.delete_metric_data() def _create_general_metric(self, **kwargs): opts = { diff --git a/openwisp_monitoring/monitoring/tests/test_charts.py b/openwisp_monitoring/monitoring/tests/test_charts.py index 744683703..86e3c1f34 100644 --- a/openwisp_monitoring/monitoring/tests/test_charts.py +++ b/openwisp_monitoring/monitoring/tests/test_charts.py @@ -119,7 +119,7 @@ def test_read_summary_top_fields_acid(self): ) data = c.read() self.assertEqual(data['summary'], {'google': 87500000, 'facebook': 37503000}) - self.assertEqual(c._get_top_fields(2), ['google', 'facebook']) + self.assertEqual(c.get_top_fields(2), ['google', 'facebook']) def test_read_multiple(self): c = self._create_chart(test_data=None, configuration='multiple_test') @@ -166,14 +166,6 @@ def test_read_bad_query(self): else: self.fail('ValidationError not raised') - def test_default_query(self): - c = self._create_chart(test_data=False) - q = ( - "SELECT {field_name} FROM {key} WHERE time >= '{time}' AND " - "content_type = '{content_type}' AND object_id = '{object_id}'" - ) - self.assertEqual(c.query, q) - def test_get_query(self): c = self._create_chart(test_data=False) m = c.metric @@ -190,24 +182,6 @@ def test_get_query(self): expected = "{0} tz('{1}')".format(expected, settings.TIME_ZONE) self.assertEqual(c.get_query(), expected) - def test_forbidden_queries(self): - c = self._create_chart(test_data=False) - queries = [ - 'DROP DATABASE openwisp2', - 'DROP MEASUREMENT test_metric', - 'CREATE DATABASE test', - 'DELETE MEASUREMENT test_metric', - 'ALTER RETENTION POLICY policy', - 'SELECT * INTO metric2 FROM test_metric', - ] - for q in queries: - try: - c._is_query_allowed(q) - except ValidationError as e: - self.assertIn('configuration', e.message_dict) - else: - self.fail('ValidationError not raised') - def test_description(self): c = self._create_chart(test_data=False) self.assertEqual(c.description, 'Dummy chart for testing purposes.') @@ -231,20 +205,6 @@ def test_wifi_hostapd(self): # last 10 days self.assertEqual(data['traces'][0][1][-10:], [0, 2, 2, 2, 2, 2, 2, 2, 2, 4]) - def test_get_query_1d(self): - c = self._create_chart(test_data=None, configuration='uptime') - q = c.get_query(time='1d') - last24 = now() - timedelta(days=1) - self.assertIn(str(last24)[0:14], q) - self.assertIn('group by time(10m)', q.lower()) - - def test_get_query_30d(self): - c = self._create_chart(test_data=None, configuration='uptime') - q = c.get_query(time='30d') - last30d = now() - timedelta(days=30) - self.assertIn(str(last30d)[0:10], q) - self.assertIn('group by time(24h)', q.lower()) - def test_get_time(self): c = Chart() now_ = now() @@ -253,38 +213,13 @@ def test_get_time(self): self.assertIn(str(now() - timedelta(days=1))[0:10], c._get_time('1d')) self.assertIn(str(now() - timedelta(days=3))[0:10], c._get_time('3d')) - def test_get_query_fields_function(self): - c = self._create_chart(test_data=None, configuration='histogram') - q = c.get_query(fields=['ssh', 'http2', 'apple-music']) - expected = ( - 'SELECT SUM("ssh") / 1 AS ssh, ' - 'SUM("http2") / 1 AS http2, ' - 'SUM("apple-music") / 1 AS apple_music FROM' - ) - self.assertIn(expected, q) - - def test_get_custom_query(self): - c = self._create_chart(test_data=None) - custom_q = c._default_query.replace('{field_name}', '{fields}') - q = c.get_query(query=custom_q, fields=['SUM(*)']) - self.assertIn('SELECT SUM(*) FROM', q) - def test_get_top_fields(self): c = self._create_chart(test_data=None, configuration='histogram') + self.assertEqual(c.get_top_fields(number=3), []) c.metric.write( None, extra_values={'http2': 100, 'ssh': 90, 'udp': 80, 'spdy': 70} ) - self.assertEqual(c._get_top_fields(number=3), ['http2', 'ssh', 'udp']) - - def test_is_aggregate_bug(self): - m = self._create_object_metric(name='summary_avg') - c = Chart(metric=m, configuration='dummy') - self.assertFalse(c._is_aggregate(c.query)) - - def test_is_aggregate_fields_function(self): - m = self._create_object_metric(name='is_aggregate_func') - c = Chart(metric=m, configuration='uptime') - self.assertTrue(c._is_aggregate(c.query)) + self.assertEqual(c.get_top_fields(number=3), ['http2', 'ssh', 'udp']) def test_query_histogram(self): m = self._create_object_metric(name='histogram') diff --git a/openwisp_monitoring/monitoring/tests/test_db_creation.py b/openwisp_monitoring/monitoring/tests/test_db_creation.py index b6d0b83d1..23b62f78a 100644 --- a/openwisp_monitoring/monitoring/tests/test_db_creation.py +++ b/openwisp_monitoring/monitoring/tests/test_db_creation.py @@ -6,25 +6,21 @@ from django.test import TestCase from requests.exceptions import ConnectionError -from ..utils import get_db - -def mock_create_database(self, db): +def mock_create_database(**kwargs): raise ConnectionError class TestDatabase(TestCase): app = 'monitoring' - @patch('openwisp_monitoring.monitoring.settings.INFLUXDB_DATABASE', 'test_db') @patch('openwisp_monitoring.monitoring.apps.MonitoringConfig.warn_and_delay') - @patch('influxdb.client.InfluxDBClient.create_database', mock_create_database) + @patch('openwisp_monitoring.db.timeseries_db.create_database', mock_create_database) def test_check_retry(self, mock): try: apps.get_app_config(self.app).create_database() except ConnectionError: pass - get_db().drop_database('test_db') self.assertEqual(mock.call_count, 5) @patch('openwisp_monitoring.monitoring.apps.sleep', return_value=None) @@ -34,7 +30,7 @@ def test_warn_and_delay(self, mock): apps.get_app_config(self.app).warn_and_delay(1) self.assertEqual( f.getvalue(), - 'Got error while connecting to timeseries DB. ' + 'Got error while connecting to timeseries database. ' 'Retrying again in 3 seconds (attempt n. 1 out of 5).\n', ) mock.assert_called_with(3) diff --git a/openwisp_monitoring/monitoring/tests/test_models.py b/openwisp_monitoring/monitoring/tests/test_models.py index fc9c6fb43..60ebe2cf4 100644 --- a/openwisp_monitoring/monitoring/tests/test_models.py +++ b/openwisp_monitoring/monitoring/tests/test_models.py @@ -10,7 +10,6 @@ from openwisp_utils.tests import catch_signal from ..signals import post_metric_write, pre_metric_write, threshold_crossed -from ..utils import query, write from . import TestMonitoringMixin start_time = timezone.now() @@ -107,72 +106,6 @@ def test_get_or_create_renamed_object(self): self.assertEqual(m2.name, m.name) self.assertFalse(created) - def test_write(self): - write('test_write', dict(value=2), database=self.TEST_DB) - measurement = list(query('select * from test_write').get_points())[0] - self.assertEqual(measurement['value'], 2) - - def test_general_write(self): - m = self._create_general_metric(name='Sync test') - m.write(1) - measurement = list(query('select * from sync_test').get_points())[0] - self.assertEqual(measurement['value'], 1) - - def test_object_write(self): - om = self._create_object_metric() - om.write(3) - content_type = '.'.join(om.content_type.natural_key()) - q = ( - "select * from test_metric WHERE object_id = '{0}'" - " AND content_type = '{1}'".format(om.object_id, content_type) - ) - measurement = list(query(q).get_points())[0] - self.assertEqual(measurement['value'], 3) - - def test_general_same_key_different_fields(self): - down = self._create_general_metric( - name='traffic (download)', key='traffic', field_name='download' - ) - down.write(200) - up = self._create_general_metric( - name='traffic (upload)', key='traffic', field_name='upload' - ) - up.write(100) - measurement = list(query('select download from traffic').get_points())[0] - self.assertEqual(measurement['download'], 200) - measurement = list(query('select upload from traffic').get_points())[0] - self.assertEqual(measurement['upload'], 100) - - def test_object_same_key_different_fields(self): - user = self._create_user() - user_down = self._create_object_metric( - name='traffic (download)', - key='traffic', - field_name='download', - content_object=user, - ) - user_down.write(200) - user_up = self._create_object_metric( - name='traffic (upload)', - key='traffic', - field_name='upload', - content_object=user, - ) - user_up.write(100) - content_type = '.'.join(user_down.content_type.natural_key()) - q = ( - "select download from traffic WHERE object_id = '{0}'" - " AND content_type = '{1}'".format(user_down.object_id, content_type) - ) - measurement = list(query(q).get_points())[0] - self.assertEqual(measurement['download'], 200) - q = ( - "select upload from traffic WHERE object_id = '{0}'" - " AND content_type = '{1}'".format(user_up.object_id, content_type) - ) - measurement = list(query(q).get_points())[0] - self.assertEqual(measurement['upload'], 100) - def test_read_general_metric(self): m = self._create_general_metric(name='load') m.write(50, check=False) diff --git a/openwisp_monitoring/monitoring/utils.py b/openwisp_monitoring/monitoring/utils.py deleted file mode 100644 index a7cebb4a2..000000000 --- a/openwisp_monitoring/monitoring/utils.py +++ /dev/null @@ -1,62 +0,0 @@ -import logging -from datetime import datetime - -from influxdb import client - -from . import settings - -logger = logging.getLogger(__name__) - - -def get_db(): - """ Returns an ``InfluxDBClient`` instance """ - return client.InfluxDBClient( - settings.INFLUXDB_HOST, - settings.INFLUXDB_PORT, - settings.INFLUXDB_USER, - settings.INFLUXDB_PASSWORD, - settings.INFLUXDB_DATABASE, - ) - - -def query(query, params=None, epoch=None, expected_response_code=200, database=None): - """ Wrapper around ``InfluxDBClient.query()`` """ - db = get_db() - database = database or settings.INFLUXDB_DATABASE - return db.query( - query, - params, - epoch=epoch, - expected_response_code=expected_response_code, - database=database, - ) - - -def write( - name, values, tags=None, timestamp=None, database=None, retention_policy=None -): - """ Method to be called via threading module. """ - point = {'measurement': name, 'tags': tags, 'fields': values} - if isinstance(timestamp, datetime): - timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') - if timestamp: - point['time'] = timestamp - try: - get_db().write( - {'points': [point]}, - {'db': database or settings.INFLUXDB_DATABASE, 'rp': retention_policy}, - ) - except Exception as e: - raise e - - -def create_database(): - """ creates database if necessary """ - db = get_db() - response = db.query('SHOW DATABASES') - items = list(response.get_points('databases')) - databases = [database['name'] for database in items] - # if database does not exists, create it - if settings.INFLUXDB_DATABASE not in databases: - db.create_database(settings.INFLUXDB_DATABASE) - logger.info(f'Created influxdb database {settings.INFLUXDB_DATABASE}') diff --git a/tests/openwisp2/settings.py b/tests/openwisp2/settings.py index e9db2da16..94ff659f6 100644 --- a/tests/openwisp2/settings.py +++ b/tests/openwisp2/settings.py @@ -16,6 +16,15 @@ } } +TIMESERIES_DATABASE = { + 'BACKEND': 'openwisp_monitoring.db.backends.influxdb', + 'USER': 'openwisp', + 'PASSWORD': 'openwisp', + 'NAME': 'openwisp2', + 'HOST': 'localhost', + 'PORT': '8086', +} + SECRET_KEY = 'fn)t*+$)ugeyip6-#txyy$5wf2ervc0d2n#h)qb)y5@ly$t*@w' INSTALLED_APPS = [ @@ -136,10 +145,6 @@ OPENWISP_MONITORING_MANAGEMENT_IP_ONLY = False -INFLUXDB_USER = 'openwisp' -INFLUXDB_PASSWORD = 'openwisp' -INFLUXDB_DATABASE = 'openwisp2' - CACHES = { 'default': { 'BACKEND': 'django_redis.cache.RedisCache', @@ -170,6 +175,37 @@ EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend' CELERY_EMAIL_BACKEND = EMAIL_BACKEND +# chart configuration queries of InfluxDB for automated tests +test_query = { + 'histogram': ( + "SELECT {fields|SUM|/ 1} FROM {key} " + "WHERE time >= '{time}' AND content_type = " + "'{content_type}' AND object_id = '{object_id}'" + ), + 'bad_test': "BAD", + 'default': ( + "SELECT {field_name} FROM {key} WHERE time >= '{time}' AND " + "content_type = '{content_type}' AND object_id = '{object_id}'" + ), + 'multiple_test': ( + "SELECT {field_name}, value2 FROM {key} WHERE time >= '{time}' AND " + "content_type = '{content_type}' AND object_id = '{object_id}'" + ), + 'mean_test': ( + "SELECT MEAN({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND " + "content_type = '{content_type}' AND object_id = '{object_id}'" + ), + 'sum_test': ( + "SELECT SUM({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND " + "content_type = '{content_type}' AND object_id = '{object_id}'" + ), + 'top_fields_mean': ( + "SELECT {fields|MEAN} FROM {key} " + "WHERE time >= '{time}' AND content_type = " + "'{content_type}' AND object_id = '{object_id}'" + ), +} + # this custom chart configuration is used for automated testing purposes OPENWISP_MONITORING_CHARTS = { 'histogram': { @@ -178,13 +214,7 @@ 'description': 'Histogram', 'top_fields': 2, 'order': 999, - 'query': { - 'influxdb': ( - "SELECT {fields|SUM|/ 1} FROM {key} " - "WHERE time >= '{time}' AND content_type = " - "'{content_type}' AND object_id = '{object_id}'" - ) - }, + 'query': test_query['histogram'], }, 'dummy': { 'type': 'line', @@ -200,7 +230,7 @@ 'description': 'Bugged chart for testing purposes.', 'unit': 'bugs', 'order': 999, - 'query': {'influxdb': "BAD"}, + 'query': test_query['bad_test'], }, 'default': { 'type': 'line', @@ -208,12 +238,7 @@ 'description': 'Default query for testing purposes', 'unit': 'n.', 'order': 999, - 'query': { - 'influxdb': ( - "SELECT {field_name} FROM {key} WHERE time >= '{time}' AND " - "content_type = '{content_type}' AND object_id = '{object_id}'" - ) - }, + 'query': test_query['default'], }, 'multiple_test': { 'type': 'line', @@ -221,12 +246,7 @@ 'description': 'For testing purposes', 'unit': 'n.', 'order': 999, - 'query': { - 'influxdb': ( - "SELECT {field_name}, value2 FROM {key} WHERE time >= '{time}' AND " - "content_type = '{content_type}' AND object_id = '{object_id}'" - ) - }, + 'query': test_query['multiple_test'], }, 'mean_test': { 'type': 'line', @@ -234,12 +254,7 @@ 'description': 'For testing purposes', 'unit': 'n.', 'order': 999, - 'query': { - 'influxdb': ( - "SELECT MEAN({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND " - "content_type = '{content_type}' AND object_id = '{object_id}'" - ) - }, + 'query': test_query['mean_test'], }, 'sum_test': { 'type': 'line', @@ -247,12 +262,7 @@ 'description': 'For testing purposes', 'unit': 'n.', 'order': 999, - 'query': { - 'influxdb': ( - "SELECT SUM({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND " - "content_type = '{content_type}' AND object_id = '{object_id}'" - ) - }, + 'query': test_query['sum_test'], }, 'top_fields_mean': { 'type': 'histogram', @@ -260,13 +270,7 @@ 'description': 'For testing purposes', 'top_fields': 2, 'order': 999, - 'query': { - 'influxdb': ( - "SELECT {fields|MEAN} FROM {key} " - "WHERE time >= '{time}' AND content_type = " - "'{content_type}' AND object_id = '{object_id}'" - ) - }, + 'query': test_query['top_fields_mean'], }, }