Skip to content

Commit

Permalink
Log event worker fixes (#295)
Browse files Browse the repository at this point in the history
  • Loading branch information
kat-statsig authored Aug 6, 2024
1 parent 5772f30 commit de5dabb
Show file tree
Hide file tree
Showing 11 changed files with 563 additions and 43 deletions.
2 changes: 1 addition & 1 deletion statsig/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
from .statsig_environment_tier import StatsigEnvironmentTier
from .evaluator import _Evaluator
from .interface_data_store import IDataStore
from .sdk_flags import _SDKFlags
from .sdk_configs import _SDK_Configs
from .utils import HashingAlgorithm
from .version import __version__
44 changes: 28 additions & 16 deletions statsig/batch_event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
from typing import Dict, List, Deque

from . import globals
from .sdk_configs import _SDK_Configs
from .statsig_event import StatsigEvent
from .diagnostics import Context
from .statsig_options import StatsigOptions
from .statsig_options import StatsigOptions, DEFAULT_EVENT_QUEUE_SIZE
from .thread_util import spawn_background_thread, THREAD_JOIN_TIMEOUT


Expand Down Expand Up @@ -94,7 +95,8 @@ def add_event(self, event):
batched_event = None
with self._lock:
self._event_array.append(event)
if len(self._event_array) >= self._batch_size:
batch_size = self._check_batch_array_size_interval() or self._batch_size
if len(self._event_array) >= batch_size:
should_batch = True
batched_event = BatchEventLogs(
payload={
Expand All @@ -111,12 +113,13 @@ def add_event(self, event):
self.add_to_batched_events_queue(batched_event)

def get_all_batched_events(self):
self.batch_events()
with self._lock:
copy_events = list(self._batched_events_queue)
return copy_events

def shutdown(self):
self.batch_events()
self._send_and_reset_dropped_events_count()
if self._batching_thread is not None:
self._batching_thread.join(THREAD_JOIN_TIMEOUT)
if self._dropped_events_count_logging_thread is not None:
Expand All @@ -141,19 +144,20 @@ def _log_dropped_events_count(self, shutdown_event):
self._error_boundary.log_exception("_send_and_reset_dropped_events_count", e)

def _send_and_reset_dropped_events_count(self):
if self._dropped_events_count > 0:
dropped_event_count = self._dropped_events_count
message = (
f"Dropped {dropped_event_count} events due to log_event service outage"
)
self._error_boundary.log_exception(
"statsig::log_event_dropped_event_count",
Exception(message),
{"eventCount": self._dropped_events_count, "error": message},
bypass_dedupe=True
)
globals.logger.warning(message)
self._dropped_events_count = 0
with self._lock:
if self._dropped_events_count > 0:
dropped_event_count = self._dropped_events_count
message = (
f"Dropped {dropped_event_count} events due to log_event service outage"
)
self._error_boundary.log_exception(
"statsig::log_event_dropped_event_count",
Exception(message),
{"eventCount": self._dropped_events_count, "error": message},
bypass_dedupe=True
)
globals.logger.warning(message)
self._dropped_events_count = 0

def _add_diagnostics_event(self, context: Context):
if self._local_mode or not self._diagnostics.should_log_diagnostics(context):
Expand All @@ -169,3 +173,11 @@ def _add_diagnostics_event(self, context: Context):
event = StatsigEvent(None, _DIAGNOSTICS_EVENT)
event.metadata = metadata
self.add_event(event.to_dict())

def _check_batch_array_size_interval(self):
override_queue_size = _SDK_Configs.get_config_num_value("event_queue_size")
if override_queue_size is not None:
override_queue_size = int(override_queue_size)
if override_queue_size > 0 and override_queue_size != DEFAULT_EVENT_QUEUE_SIZE:
return override_queue_size
return None
4 changes: 2 additions & 2 deletions statsig/http_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import requests
from .diagnostics import Diagnostics, Marker
from .interface_network import IStatsigNetworkWorker, NetworkProtocol
from .sdk_flags import _SDKFlags
from .sdk_configs import _SDK_Configs
from .statsig_options import StatsigOptions, STATSIG_API, STATSIG_CDN
from .statsig_error_boundary import _StatsigErrorBoundary

Expand Down Expand Up @@ -105,7 +105,7 @@ def get_id_list(self, on_complete, url, headers, log_on_exception=False):
on_complete(resp)

def log_events(self, payload, headers=None, log_on_exception=False, retry=0):
disable_compression = _SDKFlags.on("stop_log_event_compression")
disable_compression = _SDK_Configs.on("stop_log_event_compression")
additional_headers = {
'STATSIG-RETRY': str(retry),
}
Expand Down
25 changes: 25 additions & 0 deletions statsig/sdk_configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Dict, Optional, Union, Any


class _SDK_Configs:
_flags: Dict[str, bool] = {}
_configs: Dict[str, Any] = {}

@staticmethod
def set_flags(new_flags):
_SDK_Configs._flags = new_flags

@staticmethod
def set_configs(new_configs):
_SDK_Configs._configs = new_configs

@staticmethod
def on(key):
return _SDK_Configs._flags.get(key, False) is True

@staticmethod
def get_config_num_value(config: str) -> Optional[Union[int, float]]:
value = _SDK_Configs._configs.get(config)
if isinstance(value, (int, float)):
return value
return None
13 changes: 0 additions & 13 deletions statsig/sdk_flags.py

This file was deleted.

7 changes: 5 additions & 2 deletions statsig/spec_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import List, Optional, Dict, Set

from .constants import Const
from .sdk_flags import _SDKFlags
from .sdk_configs import _SDK_Configs
from .spec_updater import SpecUpdater
from .utils import djb2_hash

Expand Down Expand Up @@ -59,6 +59,7 @@ def __init__(
statsig_metadata,
shutdown_event,
)

self.spec_updater.register_process_network_id_lists_listener(
lambda id_lists: self._process_download_id_lists(id_lists)
)
Expand Down Expand Up @@ -217,7 +218,9 @@ def parse_target_value_map_from_spec(spec, parsed):
self.init_reason = reason

flags = specs_json.get("sdk_flags", {})
_SDKFlags.set_flags(flags)
_SDK_Configs.set_flags(flags)
configs = specs_json.get("sdk_configs", {})
_SDK_Configs.set_configs(configs)

sampling_rate = specs_json.get("diagnostics", {})
self._diagnostics.set_sampling_rate(sampling_rate)
Expand Down
19 changes: 16 additions & 3 deletions statsig/statsig_logger_worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import concurrent.futures

from . import globals
from .sdk_configs import _SDK_Configs
from .statsig_network import _StatsigNetwork
from .statsig_options import StatsigOptions
from .batch_event_queue import EventBatchProcessor, BatchEventLogs
Expand All @@ -21,7 +22,8 @@ def __init__(self, net: _StatsigNetwork, error_boundary, options: StatsigOptions
self._log_interval = globals.STATSIG_LOGGING_INTERVAL_SECONDS
self.backoff_interval = globals.STATSIG_LOGGING_INTERVAL_SECONDS
self.max_failure_backoff_interval = MAX_FAILURE_BACKOFF_INTERVAL_SECONDS
self.min_success_backoff_interval = min(MIN_SUCCESS_BACKOFF_INTERVAL_SECONDS, globals.STATSIG_LOGGING_INTERVAL_SECONDS)
self.min_success_backoff_interval = min(MIN_SUCCESS_BACKOFF_INTERVAL_SECONDS,
globals.STATSIG_LOGGING_INTERVAL_SECONDS)
self._local_mode = options.local_mode
self._error_boundary = error_boundary
self._diagnostics = diagnostics
Expand Down Expand Up @@ -55,12 +57,12 @@ def force_flush(self):
self._flush_to_server(batched_events)

def shutdown(self):
self.event_batch_processor.shutdown()
event_batches = self.event_batch_processor.get_all_batched_events()
for batch in event_batches:
self._flush_to_server(batch)
if self.worker_thread is not None:
self.worker_thread.join(THREAD_JOIN_TIMEOUT)
self.event_batch_processor.shutdown()
self._executor.shutdown()

def _process_queue(self, shutdown_event):
Expand All @@ -76,7 +78,7 @@ def _flush_to_server(self, batched_events: BatchEventLogs):
if self._local_mode:
return
res = self._net.log_events(batched_events.payload, retry=batched_events.retries,
log_on_exception=True, headers=batched_events.headers)
log_on_exception=True, headers=batched_events.headers)
if res is not None:
if batched_events.retries >= 10:
message = (
Expand Down Expand Up @@ -105,15 +107,26 @@ def _flush_to_server(self, batched_events: BatchEventLogs):
self._success_backoff()

def _failure_backoff(self):
if self._check_override_interval():
return
self.backoff_interval = min(self.backoff_interval * BACKOFF_MULTIPLIER,
self.max_failure_backoff_interval)
self._log_interval = self.backoff_interval
globals.logger.info(f"Log event failure, backing off for {self._log_interval} seconds")

def _success_backoff(self):
if self._check_override_interval():
return
if self._log_interval == globals.STATSIG_LOGGING_INTERVAL_SECONDS:
return
self.backoff_interval = max(self.backoff_interval / BACKOFF_MULTIPLIER,
self.min_success_backoff_interval)
self._log_interval = self.backoff_interval
globals.logger.info(f"Log event success, decreasing backoff to {self._log_interval} seconds")

def _check_override_interval(self):
override_interval = _SDK_Configs.get_config_num_value("log_event_interval")
if override_interval is not None and override_interval > 0:
self._log_interval = float(override_interval)
return True
return False
1 change: 1 addition & 0 deletions statsig/statsig_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def _initialize_impl(self, sdk_key: str, options: Optional[StatsigOptions]):
sdk_key,
diagnostics
)

self._evaluator = _Evaluator(self._spec_store)

self._spec_store.initialize()
Expand Down
Loading

0 comments on commit de5dabb

Please sign in to comment.