Skip to content

Commit

Permalink
[monitoring] Initial commit for resolving openwisp#274
Browse files Browse the repository at this point in the history
I realized that this is a big issue and will take some time to be resolved completely so this is the initial code for it

Changes made:

 - added another backend in /home/abhigya/Documents/OpenSource/openwisp/monitoring/openwisp-monitoring/openwisp_monitoring/db/backends
 - made changes to the __init__ file of backends to now deal with two possible backends and their different requirements
 - added initial code for the influxb2.x backend client
 - wrote initial code for a method to 'standardize' query results. This will have to be implemented for all the backends in future
  • Loading branch information
AbhigyaShridhar committed Apr 12, 2022
1 parent 04d6447 commit d88fe65
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 10 deletions.
18 changes: 8 additions & 10 deletions openwisp_monitoring/db/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,8 @@

TIMESERIES_DB = getattr(settings, 'TIMESERIES_DATABASE', None)
if not TIMESERIES_DB:
TIMESERIES_DB = {
'BACKEND': 'openwisp_monitoring.db.backends.influxdb',
'USER': getattr(settings, 'INFLUXDB_USER', 'openwisp'),
'PASSWORD': getattr(settings, 'INFLUXDB_PASSWORD', 'openwisp'),
'NAME': getattr(settings, 'INFLUXDB_DATABASE', 'openwisp2'),
'HOST': getattr(settings, 'INFLUXDB_HOST', 'localhost'),
'PORT': getattr(settings, 'INFLUXDB_PORT', '8086'),
}
logger.warning(
'Timeseries database not set up in settings'
'The previous method to define Timeseries Database has been deprecated. Please refer to the docs:\n'
'https://github.com/openwisp/openwisp-monitoring#setup-integrate-in-an-existing-django-project'
)
Expand All @@ -30,11 +23,16 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
"""
try:
assert 'BACKEND' in TIMESERIES_DB, 'BACKEND'
assert 'USER' in TIMESERIES_DB, 'USER'
assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD'
assert 'NAME' in TIMESERIES_DB, 'NAME'
assert 'HOST' in TIMESERIES_DB, 'HOST'
assert 'PORT' in TIMESERIES_DB, 'PORT'
backend = TIMESERIES_DB['BACKEND']
if backend == 'openwisp_monitoring.db.backends.influxdb':
assert 'USER' in TIMESERIES_DB, 'USER'
assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD'
else if backend == 'openwisp_monitoring.db.backends.influxdb2':
assert 'ORG' in TIMESERIES_DB, 'ORG'
assert 'TOKEN' in TIMESERIES_DB, 'TOKEN'
if module:
return import_module(f'{backend_name}.{module}')
else:
Expand Down
8 changes: 8 additions & 0 deletions openwisp_monitoring/db/backends/influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ def write(self, name, values, **kwargs):
return
raise TimeseriesWriteException

def standardizeResult(self, resultObject):
"""
return the query result in the form of a list of dictionaries
to make all the back ends uniform
"""
#TODO
pass

def read(self, key, fields, tags, **kwargs):
extra_fields = kwargs.get('extra_fields')
since = kwargs.get('since')
Expand Down
Empty file.
118 changes: 118 additions & 0 deletions openwisp_monitoring/db/backends/influxdb2/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import logging
import operator
import re
from collections import OrderedDict
from datetime import datetime

from django.conf import settings
from django.core.exceptions import ValidationError
from django.utils.functional import cached_property
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.write_api import SYNCHRONOUS

from openwisp_monitoring.utils import retry

from ...exceptions import TimeseriesWriteException
from .. import TIMESERIES_DB

logger = logging.getLogger(__name__)

class DatabaseClient(object):
backend_name = 'influxdb2'

def __init__(self, db_name=None):
self._db = None
self.db_name = db_name or TIMESERIES_DB['NAME']
self.client_error = InfluxDBError
self.write_api = None
self.query_api = None

@cached_property
def db(self):
"Returns an InfluxDBClient instance"
return InfluxDBClient{
url = f"http://{TIMESERIES_DB["HOST"]}:{TIMESERIES_DB["PORT"]}",
bucket = self.db_name,
token = TIMESERIES_DB["TOKEN"]
}

@retry
def create_database(self):
"creates a new bucket if not already found"
bucket = self.db.buckets_api().find_bucket_by_name(self.db_name)
if bucket is None:
bucket = self.db.buckets_api().create_bucket(bucket_name=self.db_name, org=TIMESERIES_DB["ORG"])
logger.debug(f'Created InfluxDB2 bucket "{self.db_name}"')
else:
logger.debug(f'Bucket named "{self.db_name}" found')

@retry
def drop_database(self):
"deletes a bucket if it exists"
bucket = self.db.buckets_api().find_bucket_by_name(self.db_name)
if bucket is None:
logger.debug(f'No such bucket: "{self.db_name}"')
else:
self.db.buckets_api().delete_bucket(bucket)
logger.debug(f'Bucket named "{self.db_name}" deleted')

@retry
def query(self, query):
resultObject = self.query_api.query(query)
return resultObject

def standardizeResult(self, resultObject):
"""
return the query result in the form of a list of dictionaries
to make all the back ends uniform
"""
result = list()

for table in tables:
for row in table.records:
listObject = {
"time": row["_time"],
"field": row["_field"],
"value": row["_value"],
"mesurement": row["_measurement"]
}
result.append(listObject)

return result

def write(self, name, values, **kwargs):
"""
values can either be a list or a string
"""
timestamp = kwargs.get('timestamp') or now()
point = Point(name).time(timestamp.isoformat(sep='T', timespec='microseconds'))
tags = kwargs.get('tags')
values = kwargs.get('values')

for tag in tags:
if type(tag) == str:
tag = tag.split('=')
point.tag(tag[0], tag[1])

for value in values:
if type(value) == str:
value = tag.split('=')
point.field(value[0], value[1])

try:
self.write_api.write(bucket=self.db_name, record=point)
except Exception as e:
except Exception as exception:
logger.warning(f'got exception while writing to tsdb: {exception}')
if isinstance(exception, self.client_error):
exception_code = getattr(exception, 'code', None)
exception_message = getattr(exception, 'content')
if (
exception_code == 400
and 'points beyond retention policy dropped' in exception_message
):
return
raise TimeseriesWriteException

0 comments on commit d88fe65

Please sign in to comment.