Skip to content

Commit

Permalink
Make changes to influxdb calling code
Browse files Browse the repository at this point in the history
  • Loading branch information
zachaysan committed Aug 2, 2024
1 parent 23a9d4b commit 09866e3
Showing 1 changed file with 85 additions and 27 deletions.
112 changes: 85 additions & 27 deletions api/app_analytics/influxdb_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import typing
from collections import defaultdict
from datetime import datetime, timedelta

from django.conf import settings
from django.utils import timezone
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.write_api import SYNCHRONOUS
Expand All @@ -20,11 +22,6 @@
influx_org = settings.INFLUXDB_ORG
read_bucket = settings.INFLUXDB_BUCKET + "_downsampled_15m"

range_bucket_mappings = {
"-24h": settings.INFLUXDB_BUCKET + "_downsampled_15m",
"-7d": settings.INFLUXDB_BUCKET + "_downsampled_15m",
"-30d": settings.INFLUXDB_BUCKET + "_downsampled_1h",
}
retries = Retry(connect=3, read=3, redirect=3)
# Set a timeout to prevent threads being potentially stuck open due to network weirdness
influxdb_client = InfluxDBClient(
Expand All @@ -43,6 +40,13 @@
)


def get_range_bucket_mappings(date_start: datetime) -> str:
now = timezone.now()
if (now - date_start).days > 10:
return settings.INFLUXDB_BUCKET + "_downsampled_1h"
return settings.INFLUXDB_BUCKET + "_downsampled_15m"


class InfluxDBWrapper:
def __init__(self, name):
self.name = name
Expand Down Expand Up @@ -76,23 +80,30 @@ def write(self):

@staticmethod
def influx_query_manager(
date_start: str = "-30d",
date_stop: str = "now()",
date_start: datetime | None = None,
date_stop: datetime | None = None,
drop_columns: typing.Tuple[str, ...] = DEFAULT_DROP_COLUMNS,
filters: str = "|> filter(fn:(r) => r._measurement == 'api_call')",
extra: str = "",
bucket: str = read_bucket,
):
now = timezone.now()
if date_start is None:
date_start = now - timedelta(days=30)

if date_stop is None:
date_stop = now

# Influx throws an error for an empty range, so just return a list.
if date_start == "-0d" and date_stop == "now()":
if date_start == date_stop:
return []

query_api = influxdb_client.query_api()
drop_columns_input = str(list(drop_columns)).replace("'", '"')

query = (
f'from(bucket:"{bucket}")'
f" |> range(start: {date_start}, stop: {date_stop})"
f" |> range(start: {date_start.isoformat()}, stop: {date_stop.isoformat()})"
f" {filters}"
f" |> drop(columns: {drop_columns_input})"
f"{extra}"
Expand All @@ -108,14 +119,23 @@ def influx_query_manager(


def get_events_for_organisation(
organisation_id: id, date_start: str = "-30d", date_stop: str = "now()"
organisation_id: id,
date_start: datetime | None = None,
date_stop: datetime | None = None,
) -> int:
"""
Query influx db for usage for given organisation id
:param organisation_id: an id of the organisation to get usage for
:return: a number of request counts for organisation
"""
now = timezone.now()
if date_start is None:
date_start = now - timedelta(days=30)

if date_stop is None:
date_stop = now

result = InfluxDBWrapper.influx_query_manager(
filters=build_filter_string(
[
Expand Down Expand Up @@ -145,7 +165,9 @@ def get_events_for_organisation(


def get_event_list_for_organisation(
organisation_id: int, date_start: str = "-30d", date_stop: str = "now()"
organisation_id: int,
date_start: datetime | None = None,
date_stop: datetime | None = None,
) -> tuple[dict[str, list[int]], list[str]]:
"""
Query influx db for usage for given organisation id
Expand All @@ -154,6 +176,13 @@ def get_event_list_for_organisation(
:return: a number of request counts for organisation in chart.js scheme
"""
now = timezone.now()
if date_start is None:
date_start = now - timedelta(days=30)

if date_stop is None:
date_stop = now

results = InfluxDBWrapper.influx_query_manager(
filters=f'|> filter(fn:(r) => r._measurement == "api_call") \
|> filter(fn: (r) => r["organisation_id"] == "{organisation_id}")',
Expand All @@ -163,15 +192,12 @@ def get_event_list_for_organisation(
)
dataset = defaultdict(list)
labels = []

date_difference = date_stop - date_start
required_records = date_difference.days + 1
for result in results:
for record in result.records:
dataset[record["resource"]].append(record["_value"])
if date_stop == "now()":
required_records = abs(int(date_start[:-1])) + 1
else:
required_records = (
abs(int(date_start[:-1])) - abs(int(date_stop[:-1])) + 1
)
if len(labels) != required_records:
labels.append(record.values["_time"].strftime("%Y-%m-%d"))
return dataset, labels
Expand All @@ -181,8 +207,8 @@ def get_multiple_event_list_for_organisation(
organisation_id: int,
project_id: int = None,
environment_id: int = None,
date_start: str = "-30d",
date_stop: str = "now()",
date_start: datetime | None = None,
date_stop: datetime | None = None,
) -> list[UsageData]:
"""
Query influx db for usage for given organisation id
Expand All @@ -193,6 +219,13 @@ def get_multiple_event_list_for_organisation(
:return: a number of requests for flags, traits, identities, environment-document
"""
now = timezone.now()
if date_start is None:
date_start = now - timedelta(days=30)

if date_stop is None:
date_stop = now

filters = [
'r._measurement == "api_call"',
f'r["organisation_id"] == "{organisation_id}"',
Expand Down Expand Up @@ -227,9 +260,16 @@ def get_usage_data(
organisation_id: int,
project_id: int | None = None,
environment_id: int | None = None,
date_start: str = "-30d",
date_stop: str = "now()",
date_start: datetime | None = None,
date_stop: datetime | None = None,
) -> list[UsageData]:
now = timezone.now()
if date_start is None:
date_start = now - timedelta(days=30)

if date_stop is None:
date_stop = now

events_list = get_multiple_event_list_for_organisation(
organisation_id=organisation_id,
project_id=project_id,
Expand All @@ -243,7 +283,7 @@ def get_usage_data(
def get_multiple_event_list_for_feature(
environment_id: int,
feature_name: str,
date_start: str = "-30d",
date_start: datetime | None = None,
aggregate_every: str = "24h",
) -> list[dict]:
"""
Expand All @@ -264,11 +304,14 @@ def get_multiple_event_list_for_feature(
:param environment_id: an id of the environment to get usage for
:param feature_name: the name of the feature to get usage for
:param date_start: the influx time period to filter on, e.g. -30d, -7d, etc.
:param date_start: the influx datetime period to filter on
:param aggregate_every: the influx time period to aggregate the data by, e.g. 24h
:return: a list of dicts with feature and request count in a specific environment
"""
now = timezone.now()
if date_start is None:
date_start = now - timedelta(days=30)

results = InfluxDBWrapper.influx_query_manager(
date_start=date_start,
Expand Down Expand Up @@ -297,15 +340,20 @@ def get_multiple_event_list_for_feature(
def get_feature_evaluation_data(
feature_name: str, environment_id: int, period: str = "30d"
) -> typing.List[FeatureEvaluationData]:
assert period.endswith("d")
days = int(period[:-1])
date_start = timezone.now() - timedelta(days=days)
data = get_multiple_event_list_for_feature(
feature_name=feature_name,
environment_id=environment_id,
date_start=f"-{period}",
date_start=date_start,
)
return FeatureEvaluationDataSchema(many=True).load(data)


def get_top_organisations(date_start: str, limit: str = ""):
def get_top_organisations(
date_start: datetime | None = None, limit: str = ""
) -> dict[int, int]:
"""
Query influx db top used organisations
Expand All @@ -315,10 +363,14 @@ def get_top_organisations(date_start: str, limit: str = ""):
:return: top organisations in descending order based on api calls.
"""
now = timezone.now()
if date_start is None:
date_start = now - timedelta(days=30)

if limit:
limit = f"|> limit(n:{limit})"

bucket = range_bucket_mappings[date_start]
bucket = get_range_bucket_mappings(date_start)
results = InfluxDBWrapper.influx_query_manager(
date_start=date_start,
bucket=bucket,
Expand All @@ -333,6 +385,7 @@ def get_top_organisations(date_start: str, limit: str = ""):
)

dataset = {}

for result in results:
for record in result.records:
try:
Expand All @@ -347,7 +400,9 @@ def get_top_organisations(date_start: str, limit: str = ""):
return dataset


def get_current_api_usage(organisation_id: int, date_start: str) -> int:
def get_current_api_usage(
organisation_id: int, date_start: datetime | None = None
) -> int:
"""
Query influx db for api usage
Expand All @@ -356,6 +411,9 @@ def get_current_api_usage(organisation_id: int, date_start: str) -> int:
:return: number of current api calls
"""
now = timezone.now()
if date_start is None:
date_start = now - timedelta(days=30)

bucket = read_bucket
results = InfluxDBWrapper.influx_query_manager(
Expand Down

0 comments on commit 09866e3

Please sign in to comment.