From 5772f30907081a04b69d50d773c437cbfb360729 Mon Sep 17 00:00:00 2001 From: kat-statsig <167801639+kat-statsig@users.noreply.github.com> Date: Mon, 5 Aug 2024 10:53:26 -0700 Subject: [PATCH] Rework log event to use worker + event queue (#282) concurrency + flushing in main test failing that needs to be changed in [kong pr](https://github.com/statsig-io/kong/pull/2335) --- statsig/batch_event_queue.py | 171 ++++++++++++++++++++++++ statsig/globals.py | 3 + statsig/retryable_logs.py | 8 -- statsig/statsig_logger.py | 219 ++++++------------------------- statsig/statsig_logger_worker.py | 119 +++++++++++++++++ statsig/statsig_options.py | 15 ++- tests/test_bg_thread_spawning.py | 12 +- tests/test_logger.py | 12 +- tests/test_logger_worker.py | 78 +++++++++++ 9 files changed, 435 insertions(+), 202 deletions(-) create mode 100644 statsig/batch_event_queue.py delete mode 100644 statsig/retryable_logs.py create mode 100644 statsig/statsig_logger_worker.py create mode 100644 tests/test_logger_worker.py diff --git a/statsig/batch_event_queue.py b/statsig/batch_event_queue.py new file mode 100644 index 0000000..3aa6a87 --- /dev/null +++ b/statsig/batch_event_queue.py @@ -0,0 +1,171 @@ +import threading +from collections import deque +from dataclasses import dataclass +from typing import Dict, List, Deque + +from . import globals +from .statsig_event import StatsigEvent +from .diagnostics import Context +from .statsig_options import StatsigOptions +from .thread_util import spawn_background_thread, THREAD_JOIN_TIMEOUT + + +@dataclass +class BatchEventLogs: + payload: dict + headers: dict + event_count: int + retries: int = 0 + + +_DIAGNOSTICS_EVENT = "statsig::diagnostics" + + +class EventBatchProcessor: + def __init__(self, options: StatsigOptions, statsig_metadata: dict, shutdown_event, error_boundary, diagnostics): + self._local_mode = options.local_mode + self._diagnostics = diagnostics + self._lock = threading.Lock() + self._batch_size = options.event_queue_size + self._event_array: List[Dict] = [] + self._batched_events_queue: Deque[BatchEventLogs] = deque(maxlen=options.retry_queue_size) + self._statsig_metadata = statsig_metadata + self._shutdown_event = shutdown_event + self._batching_interval = globals.STATSIG_BATCHING_INTERVAL_SECONDS + self._error_boundary = error_boundary + self._batching_thread = None + self._dropped_events_count = 0 + self._dropped_events_count_logging_thread = None + self.spawn_bg_threads_if_needed() + + def add_to_batched_events_queue(self, batched_events): + with self._lock: + if self._batched_events_queue.maxlen is not None and len( + self._batched_events_queue) >= self._batched_events_queue.maxlen: + self._dropped_events_count += self._batched_events_queue[0].event_count + self._batched_events_queue.append(batched_events) + + def get_batched_event(self): + with self._lock: + if len(self._batched_events_queue) > 0: + return self._batched_events_queue.popleft() + return None + + def spawn_bg_threads_if_needed(self): + if self._local_mode: + return + if self._batching_thread is None or not self._batching_thread.is_alive(): + self._batching_thread = spawn_background_thread( + "logger_worker_batch_event_thread", + self._batch_events_on_interval, + (self._shutdown_event,), + self._error_boundary, + ) + if self._dropped_events_count_logging_thread is None or not self._dropped_events_count_logging_thread.is_alive(): + self._dropped_events_count_logging_thread = spawn_background_thread( + "logger_worker_log_dropped_events_thread", + self._log_dropped_events_count, + (self._shutdown_event,), + self._error_boundary, + ) + + def batch_events(self, add_to_queue=True): + batched_event = None + self._add_diagnostics_event(Context.API_CALL) + self._add_diagnostics_event(Context.LOG_EVENT) + with self._lock: + if len(self._event_array) > 0: + batched_event = BatchEventLogs( + payload={ + "events": self._event_array.copy(), + "statsigMetadata": self._statsig_metadata + }, + headers={"STATSIG-EVENT-COUNT": str(len(self._event_array))}, + event_count=len(self._event_array), + retries=0 + ) + self._event_array.clear() + if batched_event is not None and add_to_queue: + self.add_to_batched_events_queue(batched_event) + return batched_event + + def add_event(self, event): + should_batch = False + batched_event = None + with self._lock: + self._event_array.append(event) + if len(self._event_array) >= self._batch_size: + should_batch = True + batched_event = BatchEventLogs( + payload={ + "events": self._event_array.copy(), + "statsigMetadata": self._statsig_metadata + }, + headers={"STATSIG-EVENT-COUNT": str(len(self._event_array))}, + event_count=len(self._event_array), + retries=0 + ) + self._event_array.clear() + + if should_batch and batched_event is not None: + self.add_to_batched_events_queue(batched_event) + + def get_all_batched_events(self): + with self._lock: + copy_events = list(self._batched_events_queue) + return copy_events + + def shutdown(self): + self.batch_events() + if self._batching_thread is not None: + self._batching_thread.join(THREAD_JOIN_TIMEOUT) + if self._dropped_events_count_logging_thread is not None: + self._dropped_events_count_logging_thread.join(THREAD_JOIN_TIMEOUT) + + def _batch_events_on_interval(self, shutdown_event): + while True: + try: + if shutdown_event.wait(self._batching_interval): + break + self.batch_events() + except Exception as e: + self._error_boundary.log_exception("_batch_events_on_interval", e) + + def _log_dropped_events_count(self, shutdown_event): + while True: + try: + if shutdown_event.wait(self._batching_interval): + break + self._send_and_reset_dropped_events_count() + except Exception as e: + self._error_boundary.log_exception("_send_and_reset_dropped_events_count", e) + + def _send_and_reset_dropped_events_count(self): + if self._dropped_events_count > 0: + dropped_event_count = self._dropped_events_count + message = ( + f"Dropped {dropped_event_count} events due to log_event service outage" + ) + self._error_boundary.log_exception( + "statsig::log_event_dropped_event_count", + Exception(message), + {"eventCount": self._dropped_events_count, "error": message}, + bypass_dedupe=True + ) + globals.logger.warning(message) + self._dropped_events_count = 0 + + def _add_diagnostics_event(self, context: Context): + if self._local_mode or not self._diagnostics.should_log_diagnostics(context): + return + markers = self._diagnostics.get_markers(context) + self._diagnostics.clear_context(context) + if len(markers) == 0: + return + metadata = { + "markers": [marker.to_dict() for marker in markers], + "context": context, + } + event = StatsigEvent(None, _DIAGNOSTICS_EVENT) + event.metadata = metadata + self.add_event(event.to_dict()) diff --git a/statsig/globals.py b/statsig/globals.py index 64936ff..ca2809f 100644 --- a/statsig/globals.py +++ b/statsig/globals.py @@ -1,5 +1,8 @@ from .output_logger import OutputLogger +STATSIG_BATCHING_INTERVAL_SECONDS = 60.0 +STATSIG_LOGGING_INTERVAL_SECONDS = 5.0 + logger = OutputLogger('statsig.sdk') diff --git a/statsig/retryable_logs.py b/statsig/retryable_logs.py deleted file mode 100644 index b404f06..0000000 --- a/statsig/retryable_logs.py +++ /dev/null @@ -1,8 +0,0 @@ -from dataclasses import dataclass - -@dataclass -class RetryableLogs: - payload: str - headers: dict - event_count: int - retries: int = 0 diff --git a/statsig/statsig_logger.py b/statsig/statsig_logger.py index 29aebe1..56c16ea 100644 --- a/statsig/statsig_logger.py +++ b/statsig/statsig_logger.py @@ -1,19 +1,17 @@ -import collections -import concurrent.futures import threading -from concurrent.futures import Future -from typing import Optional, Union, Deque, Set, List +from typing import Optional, Union, Set, List +from .statsig_logger_worker import LoggerWorker from .statsig_network import _StatsigNetwork -from .retryable_logs import RetryableLogs +from .batch_event_queue import EventBatchProcessor from .evaluation_details import EvaluationDetails from .config_evaluation import _ConfigEvaluation from .statsig_event import StatsigEvent from .layer import Layer from . import globals -from .thread_util import spawn_background_thread, THREAD_JOIN_TIMEOUT -from .diagnostics import Diagnostics, Context +from .thread_util import spawn_background_thread +from .diagnostics import Diagnostics _CONFIG_EXPOSURE_EVENT = "statsig::config_exposure" _LAYER_EXPOSURE_EVENT = "statsig::layer_exposure" @@ -24,7 +22,7 @@ def _safe_add_evaluation_to_event( - evaluation_details: Union[EvaluationDetails, None], event: StatsigEvent + evaluation_details: Union[EvaluationDetails, None], event: StatsigEvent ): if evaluation_details is None or event is None or event.metadata is None: return @@ -36,52 +34,32 @@ def _safe_add_evaluation_to_event( class _StatsigLogger: - _background_flush: Optional[threading.Thread] - _background_retry: Optional[threading.Thread] _background_exposure_handler: Optional[threading.Thread] - def __init__(self, net: _StatsigNetwork, shutdown_event, statsig_metadata, error_boundary, options, diagnostics: Diagnostics): + def __init__(self, net: _StatsigNetwork, shutdown_event, statsig_metadata, error_boundary, options, + diagnostics: Diagnostics): self._events: List[StatsigEvent] = [] - self._retry_logs: Deque[RetryableLogs] = collections.deque(maxlen=10) self._deduper: Set[str] = set() self._net = net self._statsig_metadata = statsig_metadata self._local_mode = options.local_mode self._disabled = options.disable_all_logging self._console_logger = globals.logger - self._logging_interval = options.logging_interval - self._retry_interval = options.logging_interval - self._event_queue_size = options.event_queue_size + self._logging_interval = globals.STATSIG_BATCHING_INTERVAL_SECONDS self._error_boundary = error_boundary self._shutdown_event = shutdown_event - self._background_flush = None - self._background_retry = None self._background_exposure_handler = None - self.spawn_bg_threads_if_needed() - self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) - self._futures: Deque[Future] = collections.deque(maxlen=10) self._diagnostics = diagnostics + event_batch_processor = EventBatchProcessor(options, statsig_metadata, shutdown_event, error_boundary, diagnostics) + self.event_batch_processor = event_batch_processor + self._logger_worker = LoggerWorker(net, error_boundary, options, statsig_metadata, shutdown_event, diagnostics, + event_batch_processor) + self.spawn_bg_threads_if_needed() def spawn_bg_threads_if_needed(self): if self._local_mode: return - if self._background_flush is None or not self._background_flush.is_alive(): - self._background_flush = spawn_background_thread( - "logger_background_flush", - self._periodic_flush, - (self._shutdown_event,), - self._error_boundary, - ) - - if self._background_retry is None or not self._background_retry.is_alive(): - self._background_retry = spawn_background_thread( - "logger_background_retry", - self._periodic_retry, - (self._shutdown_event,), - self._error_boundary, - ) - if self._background_exposure_handler is None or not self._background_exposure_handler.is_alive(): self._background_exposure_handler = spawn_background_thread( "logger_background_exposure_handler", @@ -90,22 +68,22 @@ def spawn_bg_threads_if_needed(self): self._error_boundary, ) + self._logger_worker.spawn_bg_threads_if_needed() + def log(self, event): if self._local_mode or self._disabled: return - self._events.append(event.to_dict()) - if len(self._events) >= self._event_queue_size: - self.flush_in_background() + self.event_batch_processor.add_event(event.to_dict()) def log_gate_exposure( - self, - user, - gate, - value, - rule_id, - secondary_exposures, - evaluation_details: EvaluationDetails, - is_manual_exposure=False, + self, + user, + gate, + value, + rule_id, + secondary_exposures, + evaluation_details: EvaluationDetails, + is_manual_exposure=False, ): event = StatsigEvent(user, _GATE_EXPOSURE_EVENT) event.metadata = { @@ -127,13 +105,13 @@ def log_gate_exposure( self.log(event) def log_config_exposure( - self, - user, - config, - rule_id, - secondary_exposures, - evaluation_details: EvaluationDetails, - is_manual_exposure=False, + self, + user, + config, + rule_id, + secondary_exposures, + evaluation_details: EvaluationDetails, + is_manual_exposure=False, ): event = StatsigEvent(user, _CONFIG_EXPOSURE_EVENT) event.metadata = { @@ -153,12 +131,12 @@ def log_config_exposure( self.log(event) def log_layer_exposure( - self, - user, - layer: Layer, - parameter_name: str, - config_evaluation: _ConfigEvaluation, - is_manual_exposure=False, + self, + user, + layer: Layer, + parameter_name: str, + config_evaluation: _ConfigEvaluation, + is_manual_exposure=False, ): event = StatsigEvent(user, _LAYER_EXPOSURE_EVENT) @@ -188,72 +166,11 @@ def log_layer_exposure( self.log(event) - def flush_in_background(self): - event_count = len(self._events) - if event_count == 0: - return - events_copy = self._events.copy() - self._events = [] - - self._run_on_background_thread( - lambda: self._flush_to_server(events_copy, event_count) - ) - - def _flush_to_server(self, events_copy, event_count): - headers = {"STATSIG-EVENT-COUNT": str(event_count)} - - res = self._net.log_events({ - "events": events_copy, - "statsigMetadata": self._statsig_metadata, - }, log_on_exception=True, headers=headers) - if res is not None: - self._retry_logs.append(RetryableLogs(res, headers, event_count, 0)) - def flush(self): - self._add_diagnostics_event(Context.API_CALL) - self._add_diagnostics_event(Context.LOG_EVENT) - event_count = len(self._events) - if event_count == 0: - return - events_copy = self._events.copy() - self._events = [] - self._flush_to_server(events_copy, event_count) + self._logger_worker.force_flush() def shutdown(self): - self.flush() - - if self._background_flush is not None: - self._background_flush.join(THREAD_JOIN_TIMEOUT) - - if self._background_retry is not None: - self._background_retry.join(THREAD_JOIN_TIMEOUT) - - concurrent.futures.wait(self._futures, timeout=THREAD_JOIN_TIMEOUT) - self._futures.clear() - self._executor.shutdown() - - def _run_on_background_thread(self, closure): - if self._shutdown_event.is_set(): - return - future = self._executor.submit(closure) - self._futures.append(future) - - def _flush_futures(self): - for future in concurrent.futures.as_completed( - self._futures, timeout=THREAD_JOIN_TIMEOUT - ): - if future in self._futures: - self._futures.remove(future) - - def _periodic_flush(self, shutdown_event): - while True: - try: - if shutdown_event.wait(self._logging_interval): - break - self.flush() - self._flush_futures() - except Exception as e: - self._error_boundary.log_exception("_periodic_flush", e) + self._logger_worker.shutdown() def _periodic_exposure_reset(self, shutdown_event): while True: @@ -264,47 +181,6 @@ def _periodic_exposure_reset(self, shutdown_event): except Exception as e: self._error_boundary.log_exception("_periodic_exposure_reset", e) - def _periodic_retry(self, shutdown_event): - while True: - if shutdown_event.wait(self._retry_interval): - break - length = len(self._retry_logs) - for _i in range(length): - try: - retry_logs = self._retry_logs.pop() - retry_logs.retries += 1 - except IndexError: - break - - res = self._net.log_events( - retry_logs.payload, - log_on_exception=True, - retry=retry_logs.retries, - headers=retry_logs.headers, - ) - if res is not None: - if retry_logs.retries >= 10: - message = ( - f"Failed to post {retry_logs.event_count} logs after 10 retries, dropping the request" - ) - self._error_boundary.log_exception( - "statsig::log_event_failed", - Exception(message), - {"eventCount": retry_logs.event_count, "error": message}, - bypass_dedupe = True - ) - self._console_logger.warning(message) - return - - self._retry_logs.append( - RetryableLogs( - retry_logs.payload, - retry_logs.headers, - retry_logs.event_count, - retry_logs.retries, - ) - ) - def log_diagnostics_event(self, metadata): event = StatsigEvent(None, _DIAGNOSTICS_EVENT) event.metadata = metadata @@ -336,18 +212,3 @@ def _is_unique_exposure(self, user, eventName: str, metadata: Optional[dict]) -> self._deduper.add(key) return True - - def _add_diagnostics_event(self, context: Context): - if self._local_mode or not self._diagnostics.should_log_diagnostics(context): - return - markers = self._diagnostics.get_markers(context) - self._diagnostics.clear_context(context) - if len(markers) == 0: - return - metadata = { - "markers": [marker.to_dict() for marker in markers], - "context": context, - } - event = StatsigEvent(None, _DIAGNOSTICS_EVENT) - event.metadata = metadata - self._events.append(event.to_dict()) diff --git a/statsig/statsig_logger_worker.py b/statsig/statsig_logger_worker.py new file mode 100644 index 0000000..59a982c --- /dev/null +++ b/statsig/statsig_logger_worker.py @@ -0,0 +1,119 @@ +import concurrent.futures + +from . import globals +from .statsig_network import _StatsigNetwork +from .statsig_options import StatsigOptions +from .batch_event_queue import EventBatchProcessor, BatchEventLogs +from .diagnostics import Diagnostics +from .thread_util import spawn_background_thread, THREAD_JOIN_TIMEOUT + +BACKOFF_MULTIPLIER = 2.0 + +MAX_FAILURE_BACKOFF_INTERVAL_SECONDS = 120.0 +MIN_SUCCESS_BACKOFF_INTERVAL_SECONDS = 5.0 + + +class LoggerWorker: + def __init__(self, net: _StatsigNetwork, error_boundary, options: StatsigOptions, statsig_metadata, shutdown_event, + diagnostics: Diagnostics, event_batch_processor: EventBatchProcessor): + self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=2) + self._statsig_metadata = statsig_metadata + self._log_interval = globals.STATSIG_LOGGING_INTERVAL_SECONDS + self.backoff_interval = globals.STATSIG_LOGGING_INTERVAL_SECONDS + self.max_failure_backoff_interval = MAX_FAILURE_BACKOFF_INTERVAL_SECONDS + self.min_success_backoff_interval = min(MIN_SUCCESS_BACKOFF_INTERVAL_SECONDS, globals.STATSIG_LOGGING_INTERVAL_SECONDS) + self._local_mode = options.local_mode + self._error_boundary = error_boundary + self._diagnostics = diagnostics + self._shutdown_event = shutdown_event + self._net = net + self.event_batch_processor = event_batch_processor + self.worker_thread = None + self.spawn_bg_threads_if_needed() + + def spawn_bg_threads_if_needed(self): + if self._local_mode: + return + if self.worker_thread is None or not self.worker_thread.is_alive(): + self.worker_thread = spawn_background_thread( + "logger_worker_thread", + self._process_queue, + (self._shutdown_event,), + self._error_boundary, + ) + + self.event_batch_processor.spawn_bg_threads_if_needed() + + def flush_at_interval(self): + batched_events = self.event_batch_processor.get_batched_event() + if batched_events is not None: + self._flush_to_server(batched_events) + + def force_flush(self): + batched_events = self.event_batch_processor.batch_events(add_to_queue=False) + if batched_events is not None: + self._flush_to_server(batched_events) + + def shutdown(self): + self.event_batch_processor.shutdown() + event_batches = self.event_batch_processor.get_all_batched_events() + for batch in event_batches: + self._flush_to_server(batch) + if self.worker_thread is not None: + self.worker_thread.join(THREAD_JOIN_TIMEOUT) + self._executor.shutdown() + + def _process_queue(self, shutdown_event): + while True: + try: + if shutdown_event.wait(self._log_interval): + break + self.flush_at_interval() + except Exception as e: + self._error_boundary.log_exception("_process_queue", e) + + def _flush_to_server(self, batched_events: BatchEventLogs): + if self._local_mode: + return + res = self._net.log_events(batched_events.payload, retry=batched_events.retries, + log_on_exception=True, headers=batched_events.headers) + if res is not None: + if batched_events.retries >= 10: + message = ( + f"Failed to post {batched_events.event_count} logs after 10 retries, dropping the request" + ) + self._error_boundary.log_exception( + "statsig::log_event_failed", + Exception(message), + {"eventCount": batched_events.event_count, "error": message}, + bypass_dedupe=True + ) + globals.logger.warning(message) + return + + self._failure_backoff() + + self.event_batch_processor.add_to_batched_events_queue( + BatchEventLogs( + batched_events.payload, + batched_events.headers, + batched_events.event_count, + batched_events.retries + 1, + ) + ) + else: + self._success_backoff() + + def _failure_backoff(self): + self.backoff_interval = min(self.backoff_interval * BACKOFF_MULTIPLIER, + self.max_failure_backoff_interval) + self._log_interval = self.backoff_interval + globals.logger.info(f"Log event failure, backing off for {self._log_interval} seconds") + + def _success_backoff(self): + if self._log_interval == globals.STATSIG_LOGGING_INTERVAL_SECONDS: + return + self.backoff_interval = max(self.backoff_interval / BACKOFF_MULTIPLIER, + self.min_success_backoff_interval) + self._log_interval = self.backoff_interval + globals.logger.info(f"Log event success, decreasing backoff to {self._log_interval} seconds") diff --git a/statsig/statsig_options.py b/statsig/statsig_options.py index f502a3c..4194e36 100644 --- a/statsig/statsig_options.py +++ b/statsig/statsig_options.py @@ -15,6 +15,7 @@ DEFAULT_EVENT_QUEUE_SIZE = 500 DEFAULT_IDLISTS_THREAD_LIMIT = 3 DEFAULT_LOGGING_INTERVAL = 60 +DEFAULT_RETRY_QUEUE_SIZE = 10 STATSIG_API = "https://statsigapi.net/v1/" STATSIG_CDN = "https://api.statsigcdn.com/v1/" @@ -75,14 +76,13 @@ def __init__( event_queue_size: Optional[int] = DEFAULT_EVENT_QUEUE_SIZE, data_store: Optional[IDataStore] = None, idlists_thread_limit: int = DEFAULT_IDLISTS_THREAD_LIMIT, - logging_interval: int = DEFAULT_LOGGING_INTERVAL, + logging_interval: int = DEFAULT_LOGGING_INTERVAL, #deprecated disable_diagnostics: bool = False, custom_logger: Optional[OutputLogger] = None, - enable_debug_logs=False, - disable_all_logging=False, - evaluation_callback: Optional[ - Callable[[Union[Layer, DynamicConfig, FeatureGate]], None] - ] = None, + enable_debug_logs = False, + disable_all_logging = False, + evaluation_callback: Optional[Callable[[Union[Layer, DynamicConfig, FeatureGate]], None]] = None, + retry_queue_size: int = DEFAULT_RETRY_QUEUE_SIZE, proxy_configs: Optional[Dict[NetworkEndpoint, ProxyConfig]] = None, fallback_to_statsig_api: Optional[bool] = False, initialize_sources: Optional[List[DataSource]] = None, @@ -122,6 +122,7 @@ def __init__( self.enable_debug_logs = enable_debug_logs self.disable_all_logging = disable_all_logging self.evaluation_callback = evaluation_callback + self.retry_queue_size = retry_queue_size self.fallback_to_statsig_api = fallback_to_statsig_api self._set_logging_copy() if proxy_configs is None: @@ -178,4 +179,6 @@ def _set_logging_copy(self): logging_copy["disable_diagnostics"] = self.disable_diagnostics if self.event_queue_size != DEFAULT_EVENT_QUEUE_SIZE: logging_copy["event_queue_size"] = self.event_queue_size + if self.retry_queue_size != DEFAULT_RETRY_QUEUE_SIZE: + logging_copy["retry_queue_size"] = self.retry_queue_size self.logging_copy = logging_copy diff --git a/tests/test_bg_thread_spawning.py b/tests/test_bg_thread_spawning.py index cd22900..e35b73e 100644 --- a/tests/test_bg_thread_spawning.py +++ b/tests/test_bg_thread_spawning.py @@ -60,13 +60,11 @@ def test_spec_store_dead_threads_restart(self, mock_request): def _logger_none_restart_test(self, actions: List[Callable]): for action in actions: - self._server._logger._background_flush = None - self._server._logger._background_retry = None + self._server._logger._logger_worker.worker_thread = None action() - self.assertIsNotNone(self._server._logger._background_flush) - self.assertIsNotNone(self._server._logger._background_retry) + self.assertIsNotNone(self._server._logger._logger_worker.worker_thread) def _logger_local_mode_restart_test(self, actions: List[Callable]): for action in actions: @@ -84,13 +82,11 @@ def always_false(): return False for action in actions: - self._server._logger._background_flush.is_alive = always_false - self._server._logger._background_retry.is_alive = always_false + self._server._logger._logger_worker.worker_thread.is_alive = always_false action() - self.assertTrue(self._server._logger._background_flush.is_alive()) - self.assertTrue(self._server._logger._background_retry.is_alive()) + self.assertTrue(self._server._logger._logger_worker.worker_thread.is_alive()) def _spec_store_none_restart_test(self, actions: List[Callable]): for action in actions: diff --git a/tests/test_logger.py b/tests/test_logger.py index 7921904..652fc21 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -3,6 +3,7 @@ from unittest.mock import patch from gzip_helpers import GzipHelpers +from statsig import globals from statsig.statsig_event import StatsigEvent from statsig.statsig_options import StatsigOptions from statsig.statsig_server import StatsigServer @@ -40,10 +41,19 @@ def on_log(url: str, **kwargs): self._network_stub.stub_request_with_function("log_event", 202, on_log) + globals.STATSIG_LOGGING_INTERVAL_SECONDS = 1 self._instance.initialize("secret-key", options) self._user = StatsigUser("dloomb") ## clear diagnostics initialize log + self.flush() + + def tearDown(self): + globals.STATSIG_LOGGING_INTERVAL_SECONDS = 5.0 + self._instance.shutdown() + + @patch('requests.request', side_effect=_network_stub.mock) + def flush(self, mock_request): self._instance.flush() @patch('requests.request', side_effect=_network_stub.mock) @@ -145,7 +155,7 @@ def test_log_content(self, mock_request): def _run_and_wait_for_logs(self, task): self._didLog = threading.Event() task() - self._didLog.wait(1) + self._didLog.wait(2) if __name__ == '__main__': diff --git a/tests/test_logger_worker.py b/tests/test_logger_worker.py new file mode 100644 index 0000000..0dd0481 --- /dev/null +++ b/tests/test_logger_worker.py @@ -0,0 +1,78 @@ +import unittest +from unittest.mock import patch + +import statsig.statsig +import random + +from statsig import globals, StatsigServer, StatsigOptions, StatsigUser +from tests.network_stub import NetworkStub + + +class LoggerTest(unittest.TestCase): + _network_stub = NetworkStub("http://logger-worker-test") + + def setUp(self): + self._instance = StatsigServer() + options = StatsigOptions( + api="http://logger-worker-test", + disable_diagnostics=True, + rulesets_sync_interval=100000, # Skip config sync and avoid diagnostics event + idlists_sync_interval=100000 # Skip config sync and avoid diagnostics event + ) + + self._network_stub.reset() + + self._network_stub.stub_request_with_value("log_event", 202, {}) + self._instance.initialize("secret-key", options) + self._user = StatsigUser("dloomb") + + ## clear diagnostics initialize log + self.flush() + + + @patch('requests.request', side_effect=_network_stub.mock) + def flush(self, mock_request): + self._instance.flush() + + def test_backoff_intervals(self): + ease_out_backoff_intervals = [5, 10, 20, 40, 80, 120, 120, 120, 120, 120] + ease_in_backoff_intervals = [120, 60, 30, 15, 7.5, 5, 5, 5, 5, 5] + + actual_out_intervals = [] + actual_in_intervals = [] + + for i in range(10): + curr_interval = self._instance._logger._logger_worker._log_interval + actual_out_intervals.append(curr_interval) + self._instance._logger._logger_worker._failure_backoff() + + are_equal = all(float(a) == float(b) for a, b in zip(ease_out_backoff_intervals, actual_out_intervals)) + self.assertTrue(are_equal) + + for i in range(10): + curr_interval = self._instance._logger._logger_worker._log_interval + actual_in_intervals.append(curr_interval) + self._instance._logger._logger_worker._success_backoff() + + are_equal = all(float(a) == float(b) for a, b in zip(ease_in_backoff_intervals, actual_in_intervals)) + self.assertTrue(are_equal) + + + def test_variable_backoff_intervals(self): + out_of_range = False + + def randomly_backoff(): + which_backoff = random.choice([True, False]) + if which_backoff: + self._instance._logger._logger_worker._failure_backoff() + else: + self._instance._logger._logger_worker._success_backoff() + + for i in range(50): + curr_interval = self._instance._logger._logger_worker._log_interval + if curr_interval < 5 or curr_interval > 120: + out_of_range = True + break + randomly_backoff() + + self.assertFalse(out_of_range)