Skip to content

Commit

Permalink
[db] Abstracted code which deals with time series DB #65
Browse files Browse the repository at this point in the history
Implements and closes #65
  • Loading branch information
nepython authored Jun 23, 2020
1 parent 0a8b4d8 commit b2553ba
Show file tree
Hide file tree
Showing 22 changed files with 741 additions and 545 deletions.
12 changes: 9 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,15 @@ Follow the setup instructions of `openwisp-controller
]
# Make sure you change them in production
INFLUXDB_USER = 'openwisp'
INFLUXDB_PASSWORD = 'openwisp'
INFLUXDB_DATABASE = 'openwisp2'
# You can select one of the backends located in openwisp_monitoring.db.backends
TIMESERIES_DATABASE = {
'BACKEND': 'openwisp_monitoring.db.backends.influxdb',
'USER': 'openwisp',
'PASSWORD': 'openwisp',
'NAME': 'openwisp2',
'HOST': 'localhost',
'PORT': '8086',
}
``urls.py``:

Expand Down
7 changes: 7 additions & 0 deletions openwisp_monitoring/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .backends import timeseries_db

chart_query = timeseries_db.queries.chart_query
default_chart_query = timeseries_db.queries.default_chart_query
device_data_query = timeseries_db.queries.device_data_query

__all__ = ['timeseries_db', 'chart_query', 'default_chart_query', 'device_data_query']
63 changes: 63 additions & 0 deletions openwisp_monitoring/db/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import logging
from importlib import import_module

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.db import DatabaseError

logger = logging.getLogger(__name__)

TIMESERIES_DB = getattr(settings, 'TIMESERIES_DATABASE', None)
if not TIMESERIES_DB:
TIMESERIES_DB = {
'BACKEND': 'openwisp_monitoring.db.backends.influxdb',
'USER': getattr(settings, 'INFLUXDB_USER', 'openwisp'),
'PASSWORD': getattr(settings, 'INFLUXDB_PASSWORD', 'openwisp'),
'NAME': getattr(settings, 'INFLUXDB_DATABASE', 'openwisp2'),
'HOST': getattr(settings, 'INFLUXDB_HOST', 'localhost'),
'PORT': getattr(settings, 'INFLUXDB_PORT', '8086'),
}
logger.warning(
'The previous method to define Timeseries Database has been deprecated. Please refer to the docs:\n'
'https://github.com/openwisp/openwisp-monitoring#setup-integrate-in-an-existing-django-project',
)


def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
"""
Returns database backend module given a fully qualified database backend name,
or raise an error if it doesn't exist or backend is not well defined.
"""
try:
assert 'BACKEND' in TIMESERIES_DB, 'BACKEND'
assert 'USER' in TIMESERIES_DB, 'USER'
assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD'
assert 'NAME' in TIMESERIES_DB, 'NAME'
assert 'HOST' in TIMESERIES_DB, 'HOST'
assert 'PORT' in TIMESERIES_DB, 'PORT'
if module:
return import_module(f'{backend_name}.{module}')
else:
return import_module(backend_name)
except AttributeError as e:
raise DatabaseError('No TIMESERIES_DATABASE specified in settings') from e
except AssertionError as e:
raise ImproperlyConfigured(
f'"{e}" field is not declared in TIMESERIES_DATABASE'
) from e
except ImportError as e:
# The database backend wasn't found. Display a helpful error message
# listing all built-in database backends.
builtin_backends = ['influxdb']
if backend_name not in [
f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends
]:
raise ImproperlyConfigured(
f"{backend_name} isn't an available database backend.\n"
"Try using 'openwisp_monitoring.db.backends.XXX', where XXX is one of:\n"
f"{builtin_backends}"
) from e


timeseries_db = load_backend_module(module='client').DatabaseClient()
timeseries_db.queries = load_backend_module(module='queries')
Empty file.
286 changes: 286 additions & 0 deletions openwisp_monitoring/db/backends/influxdb/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
import logging
import operator
import re
from collections import OrderedDict
from datetime import datetime

from django.conf import settings
from django.core.exceptions import ValidationError
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from influxdb import InfluxDBClient

from .. import TIMESERIES_DB
from .exception import DatabaseException

logger = logging.getLogger(__name__)


class DatabaseClient(object):
_AGGREGATE = [
'COUNT',
'DISTINCT',
'INTEGRAL',
'MEAN',
'MEDIAN',
'MODE',
'SPREAD',
'STDDEV',
'SUM',
'BOTTOM',
'FIRST',
'LAST',
'MAX',
'MIN',
'PERCENTILE',
'SAMPLE',
'TOP',
'CEILING',
'CUMULATIVE_SUM',
'DERIVATIVE',
'DIFFERENCE',
'ELAPSED',
'FLOOR',
'HISTOGRAM',
'MOVING_AVERAGE',
'NON_NEGATIVE_DERIVATIVE',
'HOLT_WINTERS',
]
_FORBIDDEN = ['drop', 'create', 'delete', 'alter', 'into']

def __init__(self, db_name=None):
self._db = None
self.db_name = db_name or TIMESERIES_DB['NAME']
self.client_error = DatabaseException.client_error

def create_database(self):
""" creates database if necessary """
db = self.get_db
# InfluxDB does not create a new database, neither raise an error if database exists
db.create_database(self.db_name)
logger.debug(f'Created InfluxDB database "{self.db_name}"')

def drop_database(self):
""" drops database if it exists """
db = self.get_db
# InfluxDB does not raise an error if database does not exist
db.drop_database(self.db_name)
logger.debug(f'Dropped InfluxDB database "{self.db_name}"')

@cached_property
def get_db(self):
""" Returns an ``InfluxDBClient`` instance """
return InfluxDBClient(
TIMESERIES_DB['HOST'],
TIMESERIES_DB['PORT'],
TIMESERIES_DB['USER'],
TIMESERIES_DB['PASSWORD'],
self.db_name,
)

def create_or_alter_retention_policy(self, name, duration):
""" creates or alters existing retention policy if necessary """
db = self.get_db
retention_policies = db.get_list_retention_policies()
exists = False
duration_changed = False
for policy in retention_policies:
if policy['name'] == name:
exists = True
duration_changed = policy['duration']
break
if not exists:
db.create_retention_policy(name=name, duration=duration, replication=1)
elif exists and duration_changed:
db.alter_retention_policy(name=name, duration=duration)

def query(self, query, precision=None, **kwargs):
db = self.get_db
database = kwargs.get('database') or self.db_name
return db.query(
query,
kwargs.get('params'),
epoch=precision,
expected_response_code=kwargs.get('expected_response_code') or 200,
database=database,
)

def write(self, name, values, **kwargs):
point = {
'measurement': name,
'tags': kwargs.get('tags'),
'fields': values,
}
timestamp = kwargs.get('timestamp')
if isinstance(timestamp, datetime):
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
if timestamp:
point['time'] = timestamp
self.get_db.write(
{'points': [point]},
{
'db': kwargs.get('database') or self.db_name,
'rp': kwargs.get('retention_policy'),
},
)

def read(self, key, fields, tags, **kwargs):
extra_fields = kwargs.get('extra_fields')
since = kwargs.get('since')
order = kwargs.get('order')
limit = kwargs.get('limit')
if extra_fields and extra_fields != '*':
fields = ', '.join([fields] + extra_fields)
elif extra_fields == '*':
fields = '*'
q = f'SELECT {fields} FROM {key}'
conditions = []
if since:
conditions.append(f'time >= {since}')
if tags:
conditions.append(
' AND '.join(["{0} = '{1}'".format(*tag) for tag in tags.items()])
)
if conditions:
conditions = 'WHERE %s' % ' AND '.join(conditions)
q = f'{q} {conditions}'
if order:
q = f'{q} ORDER BY {order}'
if limit:
q = f'{q} LIMIT {limit}'
return list(self.query(q, precision='s').get_points())

def get_list_query(self, query, precision='s'):
return list(self.query(query, precision=precision).get_points())

def get_list_retention_policies(self):
return self.get_db.get_list_retention_policies()

def delete_metric_data(self, key=None, tags=None):
"""
deletes a specific metric based on the key and tags
provided, you may also choose to delete all metrics
"""
if not key and not tags:
self.query('DROP SERIES FROM /.*/')
else:
self.get_db.delete_series(measurement=key, tags=tags)

# Chart related functions below

def validate_query(self, query):
for word in self._FORBIDDEN:
if word in query.lower():
msg = _(f'the word "{word.upper()}" is not allowed')
raise ValidationError({'configuration': msg})
return self._is_aggregate(query)

def _is_aggregate(self, q):
q = q.upper()
for word in self._AGGREGATE:
if any(['%s(' % word in q, '|%s}' % word in q, '|%s|' % word in q]):
return True
return False

def get_query(
self,
chart_type,
params,
time,
group_map,
summary=False,
fields=None,
query=None,
timezone=settings.TIME_ZONE,
):
query = self._fields(fields, query, params['field_name'])
query = query.format(**params)
query = self._group_by(query, time, chart_type, group_map, strip=summary)
if summary:
query = f'{query} LIMIT 1'
return f"{query} tz('{timezone}')"

_group_by_regex = re.compile(r'GROUP BY time\(\w+\)', flags=re.IGNORECASE)

def _group_by(self, query, time, chart_type, group_map, strip=False):
if not self.validate_query(query):
return query
if not strip and not chart_type == 'histogram':
value = group_map[time]
group_by = f'GROUP BY time({value})'
else:
# can be empty when getting summaries
group_by = ''
if 'GROUP BY' not in query.upper():
query = f'{query} {group_by}'
else:
query = re.sub(self._group_by_regex, group_by, query)
return query

_fields_regex = re.compile(
r'(?P<group>\{fields\|(?P<func>\w+)(?:\|(?P<op>.*?))?\})', flags=re.IGNORECASE
)

def _fields(self, fields, query, field_name):
"""
support substitution of {fields|<FUNCTION_NAME>|<OPERATION>}
with <FUNCTION_NAME>(field1) AS field1 <OPERATION>,
<FUNCTION_NAME>(field2) AS field2 <OPERATION>
"""
matches = re.search(self._fields_regex, query)
if not matches and not fields:
return query
elif matches and not fields:
groups = matches.groupdict()
fields_key = groups.get('group')
fields = [field_name]
if fields and matches:
groups = matches.groupdict()
function = groups['func'] # required
operation = groups.get('op') # optional
fields = [self.__transform_field(f, function, operation) for f in fields]
fields_key = groups.get('group')
else:
fields_key = '{fields}'
if fields:
selected_fields = ', '.join(fields)
return query.replace(fields_key, selected_fields)

def __transform_field(self, field, function, operation=None):
if operation:
operation = f' {operation}'
else:
operation = ''
return f'{function}("{field}"){operation} AS {field.replace("-", "_")}'

def _get_top_fields(
self,
query,
params,
chart_type,
group_map,
number,
time,
timezone=settings.TIME_ZONE,
):
q = self.get_query(
query=query,
params=params,
chart_type=chart_type,
group_map=group_map,
summary=True,
fields=['SUM(*)'],
time=time,
timezone=timezone,
)
res = list(self.query(q, precision='s').get_points())
if not res:
return []
res = res[0]
res = {key: value for key, value in res.items() if value is not None}
sorted_dict = OrderedDict(sorted(res.items(), key=operator.itemgetter(1)))
del sorted_dict['time']
keys = list(sorted_dict.keys())
keys.reverse()
top = keys[0:number]
return [item.replace('sum_', '') for item in top]
5 changes: 5 additions & 0 deletions openwisp_monitoring/db/backends/influxdb/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from influxdb.exceptions import InfluxDBClientError


class DatabaseException(object):
client_error = InfluxDBClientError
Loading

0 comments on commit b2553ba

Please sign in to comment.