Skip to content

Commit

Permalink
Prototype exposure aggregation (#222)
Browse files Browse the repository at this point in the history
For non analytics gates, we need to be able to aggregate exposures and
batch up counts per group, rather than send an entire event for every
check. This adds that functionality to the logger

Still todo - send down the type of gate in d_c_s, and use that to
determine whether to aggregate the exposures for a given check or not -
the logger class now has a parameter for that, we just need to pipe it
through

---------

Co-authored-by: Brent Echols <[email protected]>
Co-authored-by: Stephen Royal <[email protected]>
  • Loading branch information
3 people authored Jan 18, 2024
1 parent 9feaf07 commit e9b8684
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 13 deletions.
6 changes: 5 additions & 1 deletion statsig/config_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ def __init__(self,
explicit_parameters=None,
is_experiment_group=False,
evaluation_details=None,
group_name=None):
group_name=None,
aggregate_exposures=False):
self.unsupported = unsupported is True
if boolean_value is None:
boolean_value = False
Expand All @@ -37,3 +38,6 @@ def __init__(self,
evaluation_details = EvaluationDetails(0, 0, EvaluationReason.unrecognized)
self.evaluation_details = evaluation_details
self.group_name = group_name
if aggregate_exposures is None:
aggregate_exposures = False
self.aggregate_exposures = aggregate_exposures
10 changes: 6 additions & 4 deletions statsig/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,19 @@ def __evaluate(self, user, config):
exposures = []
enabled = config.get("enabled", False)
default_value = config.get("defaultValue", {})
aggregate_exposures = config.get("aggregateExposures", False)
evaluation_details = self._create_evaluation_details(
self._spec_store.init_reason)
if not enabled:
return _ConfigEvaluation(False, False, default_value, "disabled", exposures,
evaluation_details=evaluation_details)
evaluation_details=evaluation_details, aggregate_exposures=aggregate_exposures)

for rule in config.get("rules", []):
result = self.__evaluate_rule(user, rule)
if result.unsupported:
return _ConfigEvaluation(True, False, default_value, "", exposures,
evaluation_details=self._create_evaluation_details(
EvaluationReason.unsupported))
EvaluationReason.unsupported), aggregate_exposures=aggregate_exposures)
if result.secondary_exposures is not None and len(
result.secondary_exposures) > 0:
exposures = exposures + result.secondary_exposures
Expand All @@ -231,11 +232,12 @@ def __evaluate(self, user, config):
exposures,
is_experiment_group=result.is_experiment_group,
evaluation_details=evaluation_details,
group_name=result.group_name
group_name=result.group_name,
aggregate_exposures=aggregate_exposures
)

return _ConfigEvaluation(False, False, default_value, "default", exposures,
evaluation_details=evaluation_details)
evaluation_details=evaluation_details, aggregate_exposures=aggregate_exposures)

def __evaluate_rule(self, user, rule):
exposures = []
Expand Down
17 changes: 17 additions & 0 deletions statsig/exposure_aggregation_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dataclasses import dataclass
from typing import Optional

@dataclass
class ExposureAggregationData:
gate: Optional[str] = None
rule_id: Optional[str] = None
value: Optional[bool] = None
count: int = 0

def to_dict(self):
return {
"gate": self.gate,
"rule_id": self.rule_id,
"value": self.value,
"count": self.count,
}
55 changes: 47 additions & 8 deletions statsig/statsig_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Optional, Union
from .retryable_logs import RetryableLogs
from .evaluation_details import EvaluationDetails
from .exposure_aggregation_data import ExposureAggregationData
from .config_evaluation import _ConfigEvaluation
from .statsig_event import StatsigEvent
from .layer import Layer
Expand Down Expand Up @@ -35,7 +36,7 @@ def _safe_add_evaluation_to_event(
class _StatsigLogger:
_background_flush: Optional[threading.Thread]
_background_retry: Optional[threading.Thread]
_background_deduper: Optional[threading.Thread]
_background_exposure_handler: Optional[threading.Thread]

def __init__(self, net, shutdown_event, statsig_metadata, error_boundary, options):
self._events = []
Expand All @@ -52,10 +53,11 @@ def __init__(self, net, shutdown_event, statsig_metadata, error_boundary, option
self._shutdown_event = shutdown_event
self._background_flush = None
self._background_retry = None
self._background_deduper = None
self._background_exposure_handler = None
self.spawn_bg_threads_if_needed()
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
self._futures = collections.deque(maxlen=10)
self._exposure_aggregation_data = collections.defaultdict(ExposureAggregationData)

def spawn_bg_threads_if_needed(self):
if self._local_mode:
Expand All @@ -77,10 +79,10 @@ def spawn_bg_threads_if_needed(self):
self._error_boundary,
)

if self._background_deduper is None or not self._background_deduper.is_alive():
self._background_deduper = spawn_background_thread(
"logger_background_deduper",
self._periodic_dedupe_clear,
if self._background_exposure_handler is None or not self._background_exposure_handler.is_alive():
self._background_exposure_handler = spawn_background_thread(
"logger_background_exposure_handler",
self._periodic_exposure_reset,
(self._shutdown_event,),
self._error_boundary,
)
Expand All @@ -101,7 +103,15 @@ def log_gate_exposure(
secondary_exposures,
evaluation_details: EvaluationDetails,
is_manual_exposure=False,
aggregate=False,
):
if aggregate:
aggregated = self._aggregate_exposure(value, gate, rule_id)
if aggregated:
return
# even when the gate should only send aggregated exposures
# we send the first exposure for a given rule/evaluation
# so you get at least some diagnostic info
event = StatsigEvent(user, _GATE_EXPOSURE_EVENT)
event.metadata = {
"gate": gate,
Expand Down Expand Up @@ -183,6 +193,20 @@ def log_layer_exposure(

self.log(event)

def _aggregate_exposure(self, value, gate, rule_id):
key = f"{gate}::{rule_id}::{value}"
data = self._exposure_aggregation_data[key]
if not data.gate:
data.gate = gate
if not data.rule_id:
data.rule_id = rule_id
if not data.value:
data.value = value

data.count += 1

return data.count > 1

def flush_in_background(self):
event_count = len(self._events)
if event_count == 0:
Expand Down Expand Up @@ -212,6 +236,7 @@ def _flush_to_server(self, events_copy, event_count):

def flush(self):
self._add_diagnostics_api_call_event()
self._flush_aggregated_data()
event_count = len(self._events)
if event_count == 0:
return
Expand Down Expand Up @@ -255,14 +280,28 @@ def _periodic_flush(self, shutdown_event):
except Exception as e:
self._error_boundary.log_exception("_periodic_flush", e)

def _periodic_dedupe_clear(self, shutdown_event):
def _periodic_exposure_reset(self, shutdown_event):
while True:
try:
if shutdown_event.wait(self._logging_interval):
break
self._flush_aggregated_data()
self._exposure_aggregation_data = collections.defaultdict(ExposureAggregationData)
self._deduper = set()
except Exception as e:
self._error_boundary.log_exception("_periodic_dedupe_clear", e)
self._error_boundary.log_exception("_periodic_exposure_reset", e)

def _flush_aggregated_data(self):
for _key, data in self._exposure_aggregation_data.items():
# remove the offset for the first sampled exposure
data.count -= 1
if data.count == 0:
# nothing to see here - we never got enough exposures
# to aggregate anything
continue
event = StatsigEvent(None, "statsig::exposure_aggregation")
event.metadata = data.to_dict()
self.log(event)

def _periodic_retry(self, shutdown_event):
while True:
Expand Down
2 changes: 2 additions & 0 deletions statsig/statsig_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def manually_log_gate_exposure(self, user: StatsigUser, gate_name: str):
result.secondary_exposures,
result.evaluation_details,
is_manual_exposure=True,
aggregate=result.aggregate_exposures,
)

def get_config(self, user: StatsigUser, config_name: str, log_exposure=True):
Expand Down Expand Up @@ -406,6 +407,7 @@ def __check_gate(self, user: StatsigUser, gate_name: str, log_exposure=True):
result.rule_id,
result.secondary_exposures,
result.evaluation_details,
aggregate=result.aggregate_exposures,
)
return result

Expand Down

0 comments on commit e9b8684

Please sign in to comment.