Skip to content

Commit

Permalink
Teach Executor a bursty rate-limit
Browse files Browse the repository at this point in the history
The Executor currently drains the incoming task queue or stops draining at
`.batch_size` before submitting to the API.  If tasks are `.submit()`ed at just
the right cadence, it's possible to get the Executor to send lots of small
batches very frequently rather than coalescing more tasks.

The fix implemented here is to allow up to 4 back-to-back API `/submit` calls
within a window of 16s _of submissions that do not fully utilize the entire
batch_.  If more API calls come in after than, then induce a 4s delay so as to
play nice with the API.  The burst window will hopefully be ample to work
within a Jupyter notebook and not be an annoyance to a human working through
cells, and only a 4s delay after that until the window clears.  Meanwhile, for
power users, the burst limit and window are configurable (to a point).

Note that this is mostly intended as an education to users -- this is
client-side and so completely up to the user to ignore if they so choose.  One
example: simply create a new Executor -- the state is entirely kept within a
single executor so creating a new instance would be a simple workaround.

(Discovered via log trawling and conversations with a user in funcx#help.)

[sc-35759]
  • Loading branch information
khk-globus committed Sep 30, 2024
1 parent f8b0c16 commit 1f0f752
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Changed
^^^^^^^

- The Executor now implements a bursty rate-limit in the background submission
thread. The Executor is designed to coalesce up to ``.batch_size`` of tasks
and submit them in a single API call. But if tasks are supplied to the
Executor at just the right frequency, it will send much smaller batches more
frequently which is "not nice" to the API. This change allows "bursts" of up
to 4 API calls in a 16s window, and then will back off to submit every 4
seconds. Notes:

- ``.batch_size`` currently defaults to 128 but is user-settable

- If the Executor is able to completely fill the batch of tasks sent to the
API, that call is not counted toward the burst limit
49 changes: 47 additions & 2 deletions compute_sdk/globus_compute_sdk/sdk/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class InvalidStateError(Exception):
from pika.frame import Method
from pika.spec import Basic, BasicProperties


_REGISTERED_EXECUTORS: dict[int, Executor] = {}
_RESULT_WATCHERS: dict[uuid.UUID, _ResultWatcher] = {}

Expand Down Expand Up @@ -136,6 +135,8 @@ def __init__(
label: str = "",
batch_size: int = 128,
amqp_port: int | None = None,
api_burst_limit: int = 4,
api_burst_window_s: int = 16,
**kwargs,
):
"""
Expand All @@ -156,6 +157,10 @@ def __init__(
sending upstream [min: 1, default: 128]
:param amqp_port: Port to use when connecting to results queue. Note that the
Compute web services only support 5671, 5672, and 443.
:param api_burst_limit: Number of "free" API calls to allow before engaging
client-side (i.e., this executor) rate-limiting. See ``api_burst_window_s``
:param api_burst_window_s: Window of time (in seconds) in which to count API
calls for rate-limiting.
"""
deprecated_kwargs = {""}
for key in kwargs:
Expand Down Expand Up @@ -192,6 +197,9 @@ def __init__(
self.label = label
self.batch_size = max(1, batch_size)

self.api_burst_limit = min(8, max(1, api_burst_limit))
self.api_burst_window_s = min(32, max(1, api_burst_window_s))

self.task_count_submitted = 0
self._task_counter: int = 0
self._tasks_to_send: queue.Queue[
Expand Down Expand Up @@ -868,6 +876,8 @@ def __hash__(self):
t.List[_TaskSubmissionInfo],
]

api_burst_ts: list[float] = []
api_burst_fill: list[float] = []
try:
fut: ComputeFuture | None = ComputeFuture() # just start the loop; please
while fut is not None:
Expand Down Expand Up @@ -899,18 +909,53 @@ def __hash__(self):
if not tasks:
continue # fut and task are None; "single point of exit"

api_rate_steady = self.api_burst_window_s / self.api_burst_limit
for submit_group, task_list in tasks.items():
fut_list = futs[submit_group]
num_tasks = len(task_list)

tg_uuid, ep_uuid, res_spec, uep_config = submit_group
log.info(
f"Submitting tasks for Task Group {tg_uuid} to"
f" Endpoint {ep_uuid}: {len(task_list):,}"
f" Endpoint {ep_uuid}: {num_tasks:,}"
)

if api_burst_ts:
now = time.monotonic()
then = now - self.api_burst_window_s
api_burst_ts = [i for i in api_burst_ts if i > then]
api_burst_fill = api_burst_fill[-len(api_burst_ts) :]
if len(api_burst_ts) >= self.api_burst_limit:
delay = api_rate_steady - (now - api_burst_ts[-1])
delay = max(delay, 0) + random.random()
_burst_rel = [f"{now - s:.2f}" for s in api_burst_ts]
_burst_fill = [f"{p:.1f}%" for p in api_burst_fill]

log.warning(
"%r (tid:%s): API rate-limit delay of %.2fs"
"\n Consider submitting more tasks at once."
"\n batch_size = %d"
"\n api_burst_limit = %s"
"\n api_burst_window_s = %s (seconds)"
"\n recent sends: %s"
"\n recent batch fill percent: %s",
self,
_tid,
delay,
self.batch_size,
self.api_burst_limit,
self.api_burst_window_s,
", ".join(_burst_rel),
", ".join(_burst_fill),
)
time.sleep(delay)
self._submit_tasks(
tg_uuid, ep_uuid, res_spec, uep_config, fut_list, task_list
)
if num_tasks < self.api_burst_limit:
api_burst_ts.append(time.monotonic())
fill_percent = 100 * num_tasks / self.batch_size
api_burst_fill.append(fill_percent)

to_watch = [f for f in fut_list if f.task_id and not f.done()]
if not to_watch:
Expand Down
44 changes: 41 additions & 3 deletions compute_sdk/tests/unit/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,10 +938,12 @@ def test_task_submitter_respects_batch_size(gc_executor, batch_size: int):

gce.endpoint_id = uuid.uuid4()
gce.batch_size = batch_size
for _ in range(num_batches * batch_size):
gce.submit(noop)
with mock.patch(f"{_MOCK_BASE}time.sleep"):
for _ in range(num_batches * batch_size):
gce.submit(noop)

try_assert(lambda: gcc.batch_run.call_count >= num_batches)

try_assert(lambda: gcc.batch_run.call_count >= num_batches)
for args, _kwargs in gcc.batch_run.call_args_list:
*_, batch = args
assert 0 < batch.add.call_count <= batch_size
Expand Down Expand Up @@ -1016,6 +1018,42 @@ def _mock_max(*a, **k):
assert found_tg_uuid == expected


@pytest.mark.parametrize("burst_limit", (2, 3, 4))
@pytest.mark.parametrize("burst_window", (2, 3, 4))
def test_task_submitter_api_rate_limit(
gc_executor, mock_log, burst_limit, burst_window
):
gcc, gce = gc_executor
gce.endpoint_id = uuid.uuid4()
gce._submit_tasks = mock.Mock()

gce._function_registry[gce._fn_cache_key(noop)] = str(uuid.uuid4())
gce.api_burst_limit = burst_limit
gce.api_burst_window_s = burst_window
gce.batch_size = random.randint(2, 10)

exp_rate_limit = random.randint(1, 10)
exp_api_submits = burst_limit + exp_rate_limit
uep_confs = [{"something": i} for i in range(exp_api_submits)]
with mock.patch(f"{_MOCK_BASE}time.sleep"):
for uep_conf in uep_confs:
gce.user_endpoint_config = uep_conf
gce.submit(noop)

try_assert(lambda: gce._submit_tasks.call_count == exp_api_submits)

exp_perc = [100 / gce.batch_size for _ in range(1, exp_api_submits)]
exp_perc_text = ", ".join(f"{p:.1f}%" for p in exp_perc)
cal = [(a, k) for a, k in mock_log.warning.call_args_list if "api_burst" in a[0]]
assert len(cal) == exp_rate_limit, "Expect log when rate limiting"

a, k = cal[-1]
assert "batch_size" in a[0], "Expect current value reported"
assert "API rate-limit" in a[0], "Expect basic explanation of why delayed"
assert "recent batch fill percent: %s" in a[0]
assert exp_perc_text == a[-1], "Expect to share batch utilization %"


def test_task_submit_handles_multiple_user_endpoint_configs(
mocker: MockerFixture, gc_executor
):
Expand Down

0 comments on commit 1f0f752

Please sign in to comment.