Skip to content

Commit

Permalink
Count outstanding tasks on submit side rather than in interchange (#3637
Browse files Browse the repository at this point in the history
)

The count of outstanding tasks may now be higher:

A task will be counted when it enters the task submission
ZMQ channel to the interchange, rather than when it reaches
the end of the ZMQ channel (which could be a queue under load),
and a task will be counted as complete when it arrives back
on the submit side, rather than when the interchange places it
into the results channel.

It's probably more desirable to count the tasks earlier like
this and less desirable to count the results later like this,
for scaling purposes - scaling load will be observed before it
reaches the interchange, but reduction in load due to completion
won't be observed until the result has got all the way through
the result queue. I don't think this will affect users in real
life - but I don't have any numbers for what delivery times
look like across the two queues under heavy load.

This removes an RPC that could be computed locally, and removes a use
of the command channel which is generally thread-unsafe.

This removes part of shutdown hangs: when pressing ctrl-c
at the right point, shutdown will wait for the strategy
thread to shut down, but the strategy thread can get blocked on
OUTSTANDING_C, and OUTSTANDING_C will never return because
the ctrl-C also made the interchange go away.

This shutdown block is hard to demonstrate in a CI
test because there are other blocking actions (such as a
MANAGERS rpc) involved in shutdown, and it seems to need the
strategy thread to be in just the right place.

Implements #3365
  • Loading branch information
benclifford authored Oct 14, 2024
1 parent 264d128 commit 658619d
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 8 deletions.
2 changes: 1 addition & 1 deletion parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ def hold_worker(self, worker_id: str) -> None:
def outstanding(self) -> int:
"""Returns the count of tasks outstanding across the interchange
and managers"""
return self.command_client.run("OUTSTANDING_C")
return len(self.tasks)

@property
def connected_workers(self) -> int:
Expand Down
8 changes: 1 addition & 7 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,7 @@ def _command_server(self) -> NoReturn:
try:
command_req = self.command_channel.recv_pyobj()
logger.debug("Received command request: {}".format(command_req))
if command_req == "OUTSTANDING_C":
outstanding = self.pending_task_queue.qsize()
for manager in self._ready_managers.values():
outstanding += len(manager['tasks'])
reply = outstanding

elif command_req == "CONNECTED_BLOCKS":
if command_req == "CONNECTED_BLOCKS":
reply = self.connected_block_history

elif command_req == "WORKERS":
Expand Down

0 comments on commit 658619d

Please sign in to comment.