From 1f0f752ce1ad8e83aa46beac001986a6feac03b4 Mon Sep 17 00:00:00 2001 From: Kevin Hunter Kesling Date: Wed, 18 Sep 2024 22:06:31 -0400 Subject: [PATCH] Teach Executor a bursty rate-limit The Executor currently drains the incoming task queue or stops draining at `.batch_size` before submitting to the API. If tasks are `.submit()`ed at just the right cadence, it's possible to get the Executor to send lots of small batches very frequently rather than coalescing more tasks. The fix implemented here is to allow up to 4 back-to-back API `/submit` calls within a window of 16s _of submissions that do not fully utilize the entire batch_. If more API calls come in after than, then induce a 4s delay so as to play nice with the API. The burst window will hopefully be ample to work within a Jupyter notebook and not be an annoyance to a human working through cells, and only a 4s delay after that until the window clears. Meanwhile, for power users, the burst limit and window are configurable (to a point). Note that this is mostly intended as an education to users -- this is client-side and so completely up to the user to ignore if they so choose. One example: simply create a new Executor -- the state is entirely kept within a single executor so creating a new instance would be a simple workaround. (Discovered via log trawling and conversations with a user in funcx#help.) [sc-35759] --- ...evin_bursty_rate_limit_executor_submit.rst | 15 ++++++ .../globus_compute_sdk/sdk/executor.py | 49 ++++++++++++++++++- compute_sdk/tests/unit/test_executor.py | 44 +++++++++++++++-- 3 files changed, 103 insertions(+), 5 deletions(-) create mode 100644 changelog.d/20240918_223810_kevin_bursty_rate_limit_executor_submit.rst diff --git a/changelog.d/20240918_223810_kevin_bursty_rate_limit_executor_submit.rst b/changelog.d/20240918_223810_kevin_bursty_rate_limit_executor_submit.rst new file mode 100644 index 000000000..b96b71026 --- /dev/null +++ b/changelog.d/20240918_223810_kevin_bursty_rate_limit_executor_submit.rst @@ -0,0 +1,15 @@ +Changed +^^^^^^^ + +- The Executor now implements a bursty rate-limit in the background submission + thread. The Executor is designed to coalesce up to ``.batch_size`` of tasks + and submit them in a single API call. But if tasks are supplied to the + Executor at just the right frequency, it will send much smaller batches more + frequently which is "not nice" to the API. This change allows "bursts" of up + to 4 API calls in a 16s window, and then will back off to submit every 4 + seconds. Notes: + + - ``.batch_size`` currently defaults to 128 but is user-settable + + - If the Executor is able to completely fill the batch of tasks sent to the + API, that call is not counted toward the burst limit diff --git a/compute_sdk/globus_compute_sdk/sdk/executor.py b/compute_sdk/globus_compute_sdk/sdk/executor.py index 86f401744..7432bdab4 100644 --- a/compute_sdk/globus_compute_sdk/sdk/executor.py +++ b/compute_sdk/globus_compute_sdk/sdk/executor.py @@ -48,7 +48,6 @@ class InvalidStateError(Exception): from pika.frame import Method from pika.spec import Basic, BasicProperties - _REGISTERED_EXECUTORS: dict[int, Executor] = {} _RESULT_WATCHERS: dict[uuid.UUID, _ResultWatcher] = {} @@ -136,6 +135,8 @@ def __init__( label: str = "", batch_size: int = 128, amqp_port: int | None = None, + api_burst_limit: int = 4, + api_burst_window_s: int = 16, **kwargs, ): """ @@ -156,6 +157,10 @@ def __init__( sending upstream [min: 1, default: 128] :param amqp_port: Port to use when connecting to results queue. Note that the Compute web services only support 5671, 5672, and 443. + :param api_burst_limit: Number of "free" API calls to allow before engaging + client-side (i.e., this executor) rate-limiting. See ``api_burst_window_s`` + :param api_burst_window_s: Window of time (in seconds) in which to count API + calls for rate-limiting. """ deprecated_kwargs = {""} for key in kwargs: @@ -192,6 +197,9 @@ def __init__( self.label = label self.batch_size = max(1, batch_size) + self.api_burst_limit = min(8, max(1, api_burst_limit)) + self.api_burst_window_s = min(32, max(1, api_burst_window_s)) + self.task_count_submitted = 0 self._task_counter: int = 0 self._tasks_to_send: queue.Queue[ @@ -868,6 +876,8 @@ def __hash__(self): t.List[_TaskSubmissionInfo], ] + api_burst_ts: list[float] = [] + api_burst_fill: list[float] = [] try: fut: ComputeFuture | None = ComputeFuture() # just start the loop; please while fut is not None: @@ -899,18 +909,53 @@ def __hash__(self): if not tasks: continue # fut and task are None; "single point of exit" + api_rate_steady = self.api_burst_window_s / self.api_burst_limit for submit_group, task_list in tasks.items(): fut_list = futs[submit_group] + num_tasks = len(task_list) tg_uuid, ep_uuid, res_spec, uep_config = submit_group log.info( f"Submitting tasks for Task Group {tg_uuid} to" - f" Endpoint {ep_uuid}: {len(task_list):,}" + f" Endpoint {ep_uuid}: {num_tasks:,}" ) + if api_burst_ts: + now = time.monotonic() + then = now - self.api_burst_window_s + api_burst_ts = [i for i in api_burst_ts if i > then] + api_burst_fill = api_burst_fill[-len(api_burst_ts) :] + if len(api_burst_ts) >= self.api_burst_limit: + delay = api_rate_steady - (now - api_burst_ts[-1]) + delay = max(delay, 0) + random.random() + _burst_rel = [f"{now - s:.2f}" for s in api_burst_ts] + _burst_fill = [f"{p:.1f}%" for p in api_burst_fill] + + log.warning( + "%r (tid:%s): API rate-limit delay of %.2fs" + "\n Consider submitting more tasks at once." + "\n batch_size = %d" + "\n api_burst_limit = %s" + "\n api_burst_window_s = %s (seconds)" + "\n recent sends: %s" + "\n recent batch fill percent: %s", + self, + _tid, + delay, + self.batch_size, + self.api_burst_limit, + self.api_burst_window_s, + ", ".join(_burst_rel), + ", ".join(_burst_fill), + ) + time.sleep(delay) self._submit_tasks( tg_uuid, ep_uuid, res_spec, uep_config, fut_list, task_list ) + if num_tasks < self.api_burst_limit: + api_burst_ts.append(time.monotonic()) + fill_percent = 100 * num_tasks / self.batch_size + api_burst_fill.append(fill_percent) to_watch = [f for f in fut_list if f.task_id and not f.done()] if not to_watch: diff --git a/compute_sdk/tests/unit/test_executor.py b/compute_sdk/tests/unit/test_executor.py index cc57cbd6b..1da47e335 100644 --- a/compute_sdk/tests/unit/test_executor.py +++ b/compute_sdk/tests/unit/test_executor.py @@ -938,10 +938,12 @@ def test_task_submitter_respects_batch_size(gc_executor, batch_size: int): gce.endpoint_id = uuid.uuid4() gce.batch_size = batch_size - for _ in range(num_batches * batch_size): - gce.submit(noop) + with mock.patch(f"{_MOCK_BASE}time.sleep"): + for _ in range(num_batches * batch_size): + gce.submit(noop) + + try_assert(lambda: gcc.batch_run.call_count >= num_batches) - try_assert(lambda: gcc.batch_run.call_count >= num_batches) for args, _kwargs in gcc.batch_run.call_args_list: *_, batch = args assert 0 < batch.add.call_count <= batch_size @@ -1016,6 +1018,42 @@ def _mock_max(*a, **k): assert found_tg_uuid == expected +@pytest.mark.parametrize("burst_limit", (2, 3, 4)) +@pytest.mark.parametrize("burst_window", (2, 3, 4)) +def test_task_submitter_api_rate_limit( + gc_executor, mock_log, burst_limit, burst_window +): + gcc, gce = gc_executor + gce.endpoint_id = uuid.uuid4() + gce._submit_tasks = mock.Mock() + + gce._function_registry[gce._fn_cache_key(noop)] = str(uuid.uuid4()) + gce.api_burst_limit = burst_limit + gce.api_burst_window_s = burst_window + gce.batch_size = random.randint(2, 10) + + exp_rate_limit = random.randint(1, 10) + exp_api_submits = burst_limit + exp_rate_limit + uep_confs = [{"something": i} for i in range(exp_api_submits)] + with mock.patch(f"{_MOCK_BASE}time.sleep"): + for uep_conf in uep_confs: + gce.user_endpoint_config = uep_conf + gce.submit(noop) + + try_assert(lambda: gce._submit_tasks.call_count == exp_api_submits) + + exp_perc = [100 / gce.batch_size for _ in range(1, exp_api_submits)] + exp_perc_text = ", ".join(f"{p:.1f}%" for p in exp_perc) + cal = [(a, k) for a, k in mock_log.warning.call_args_list if "api_burst" in a[0]] + assert len(cal) == exp_rate_limit, "Expect log when rate limiting" + + a, k = cal[-1] + assert "batch_size" in a[0], "Expect current value reported" + assert "API rate-limit" in a[0], "Expect basic explanation of why delayed" + assert "recent batch fill percent: %s" in a[0] + assert exp_perc_text == a[-1], "Expect to share batch utilization %" + + def test_task_submit_handles_multiple_user_endpoint_configs( mocker: MockerFixture, gc_executor ):