Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[not for merge] Temporary release branch for Sander Vandenhaute testing multiple outstanding PRs #3479

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
eb3e5fc
sketch of core-aware scaling for wq
benclifford May 5, 2024
4d78911
Merge remote-tracking branch 'origin/master' into benc-sander-wq-cpu-…
benclifford Jun 4, 2024
dfaba58
Use whole worker when cores not defined, to align with actual WQ beha…
benclifford Jun 4, 2024
34f3af4
Add a fairly obvious assert on block length
benclifford Jun 5, 2024
6ea2266
add some debugging around block choice for wq scale in
benclifford Jun 5, 2024
77fbb4c
Add a bit of simulated status info
benclifford Jun 5, 2024
026b18d
Merge remote-tracking branch 'origin/benc-sander-wq-cpu-scaling' into…
benclifford Jun 5, 2024
49203a9
Add a stack-depth test
benclifford Jun 5, 2024
b9eba7a
Add notes on test constant
benclifford Jun 5, 2024
d0b369b
fix flake8, mypy
benclifford Jun 5, 2024
0e978eb
WIP: dependency launcher pool
benclifford Jun 5, 2024
862c6c3
Merge remote-tracking branch 'origin/master' into benc-3472-recursion
benclifford Jun 7, 2024
3764b93
perform launch_if_ready asynchronously, at start of a new stack
benclifford Jun 7, 2024
14dc325
Merge remote-tracking branch 'origin/master' into benc-tmp-sander
benclifford Jun 7, 2024
7ad35bb
use _status not block ID table
benclifford Jun 7, 2024
f5b27f2
Filter on non-terminal blocks, like htex does
benclifford Jun 7, 2024
b342639
Merge branches 'benc-tmp-sander' and 'benc-3472-recursion' into benc-…
benclifford Jun 7, 2024
324465a
isort
benclifford Jun 7, 2024
8fb25a4
Merge branch 'benc-3472-recursion' into benc-sander-paper
benclifford Jun 7, 2024
dce6eb3
Remove cancel futures because it needs Python 3.9
benclifford Jun 7, 2024
464c71e
rename for consistency with other test
benclifford Jun 7, 2024
d6930a4
Logging and docstrings
benclifford Jun 7, 2024
d3a07f1
Merge branch 'benc-3472-recursion' into benc-sander-paper
benclifford Jun 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import atexit
import concurrent.futures as cf
import datetime
import inspect
import logging
Expand Down Expand Up @@ -209,6 +210,8 @@ def __init__(self, config: Config) -> None:
self.tasks: Dict[int, TaskRecord] = {}
self.submitter_lock = threading.Lock()

self.dependency_launch_pool = cf.ThreadPoolExecutor(max_workers=1, thread_name_prefix="Dependency-Launch")

self.dependency_resolver = self.config.dependency_resolver if self.config.dependency_resolver is not None \
else SHALLOW_DEPENDENCY_RESOLVER

Expand Down Expand Up @@ -611,9 +614,9 @@ def check_staging_inhibited(kwargs: Dict[str, Any]) -> bool:
return kwargs.get('_parsl_staging_inhibit', False)

def launch_if_ready(self, task_record: TaskRecord) -> None:
"""
launch_if_ready will launch the specified task, if it is ready
to run (for example, without dependencies, and in pending state).
"""Schedules a task record for re-inspection to see if it is ready
for launch and for launch if it is ready. The call will return
immediately.

This should be called by any piece of the DataFlowKernel that
thinks a task may have become ready to run.
Expand All @@ -622,13 +625,17 @@ def launch_if_ready(self, task_record: TaskRecord) -> None:
ready to run - launch_if_ready will not incorrectly launch that
task.

It is also not an error to call launch_if_ready on a task that has
already been launched - launch_if_ready will not re-launch that
task.

launch_if_ready is thread safe, so may be called from any thread
or callback.
"""
self.dependency_launch_pool.submit(self._launch_if_ready_async, task_record)

@wrap_with_logs
def _launch_if_ready_async(self, task_record: TaskRecord) -> None:
"""
_launch_if_ready will launch the specified task, if it is ready
to run (for example, without dependencies, and in pending state).
"""
exec_fu = None

task_id = task_record['id']
Expand Down Expand Up @@ -1271,6 +1278,10 @@ def cleanup(self) -> None:
self.monitoring.close()
logger.info("Terminated monitoring")

logger.info("Terminating dependency launch pool")
self.dependency_launch_pool.shutdown()
logger.info("Terminated dependency launch pool")

logger.info("Unregistering atexit hook")
atexit.unregister(self.atexit_cleanup)
logger.info("Unregistered atexit hook")
Expand Down
5 changes: 5 additions & 0 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ def status(self) -> Dict[str, JobStatus]:
status = self._make_status_dict(block_ids, self._provider.status(job_ids))
else:
status = {}

logger.debug("Adding these simulated status entries onto provider-returned status: %s", self._simulated_status)
status.update(self._simulated_status)

return status
Expand Down Expand Up @@ -233,6 +235,7 @@ def _get_block_and_job_ids(self) -> Tuple[List[str], List[Any]]:
job_ids = [] # types: List[Any]
for bid in block_ids:
job_ids.append(self.blocks_to_job_id[bid])
assert len(block_ids) == len(job_ids)
return block_ids, job_ids

@abstractproperty
Expand Down Expand Up @@ -300,6 +303,8 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[
for block_id in block_ids:
new_status[block_id] = JobStatus(JobState.CANCELLED)
del self._status[block_id]
if block_id in self._simulated_status:
logger.warning("block %s is in simulated status: not properly deleted", block_id)
self.send_monitoring_info(new_status)
return block_ids

Expand Down
39 changes: 33 additions & 6 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from parsl.executors.errors import ExecutorError
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.executors.workqueue import exec_parsl_function
from parsl.jobs.states import TERMINAL_STATES
from parsl.process_loggers import wrap_with_logs
from parsl.providers import CondorProvider, LocalProvider
from parsl.providers.base import ExecutionProvider
Expand Down Expand Up @@ -244,12 +245,14 @@ def __init__(self,
full_debug: bool = True,
worker_executable: str = 'work_queue_worker',
function_dir: Optional[str] = None,
coprocess: bool = False):
coprocess: bool = False,
scaling_assume_core_slots_per_worker: int = 1):
BlockProviderExecutor.__init__(self, provider=provider,
block_error_handler=True)
if not _work_queue_enabled:
raise OptionalModuleMissing(['work_queue'], "WorkQueueExecutor requires the work_queue module.")

self.scaling_assume_core_slots_per_worker = scaling_assume_core_slots_per_worker
self.label = label
self.task_queue = multiprocessing.Queue() # type: multiprocessing.Queue
self.collector_queue = multiprocessing.Queue() # type: multiprocessing.Queue
Expand Down Expand Up @@ -469,6 +472,8 @@ def submit(self, func, resource_specification, *args, **kwargs):
# Create a Future object and have it be mapped from the task ID in the tasks dictionary
fu = Future()
fu.parsl_executor_task_id = executor_task_id
assert isinstance(resource_specification, dict)
fu.resource_specification = resource_specification
logger.debug("Getting tasks_lock to set WQ-level task entry")
with self.tasks_lock:
logger.debug("Got tasks_lock to set WQ-level task entry")
Expand Down Expand Up @@ -654,31 +659,53 @@ def initialize_scaling(self):

@property
def outstanding(self) -> int:
"""Count the number of outstanding tasks. This is inefficiently
"""Count the number of outstanding slots required. This is inefficiently
implemented and probably could be replaced with a counter.
"""
logger.debug("Calculating outstanding task slot load")
outstanding = 0
tasks = 0 # only for log message...
with self.tasks_lock:
for fut in self.tasks.values():
if not fut.done():
outstanding += 1
logger.debug(f"Counted {outstanding} outstanding tasks")
# if a task defines a resource spec with a core count, use the number
# of cores as required task slot count, instead of 1 slot.
assert isinstance(fut.resource_specification, dict) # type: ignore[attr-defined]

outstanding += fut.resource_specification.get('cores', # type: ignore[attr-defined]
self.scaling_assume_core_slots_per_worker)
tasks += 1
logger.debug(f"Counted {tasks} outstanding tasks with {outstanding} outstanding slots")
return outstanding

@property
def workers_per_node(self) -> Union[int, float]:
return 1
return self.scaling_assume_core_slots_per_worker

def scale_in(self, count: int) -> List[str]:
"""Scale in method.
"""
logger.debug("Number of jobs requested for scale in: %s", count)

candidate_blocks = [block_id for block_id, job_status in self._status.items() if job_status.state not in TERMINAL_STATES]
logger.debug("scale in candidate blocks: %s", candidate_blocks)

# Obtain list of blocks to kill
to_kill = list(self.blocks_to_job_id.keys())[:count]
to_kill = candidate_blocks[:count]

logger.debug("List of blocks to scale in: %s", to_kill)
for block_id in to_kill:
if block_id in self._status:
logger.debug("status of block %s is %s", block_id, self._status[block_id])
else:
logger.debug("block %s has no recorded status", block_id)

kill_ids = [self.blocks_to_job_id[block] for block in to_kill]

# Cancel the blocks provisioned
if self.provider:
logger.info(f"Scaling in jobs: {kill_ids}")

r = self.provider.cancel(kill_ids)
job_ids = self._filter_scale_in_ids(kill_ids, r)
block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids]
Expand Down
59 changes: 59 additions & 0 deletions parsl/tests/test_python_apps/test_dependencies_deep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import inspect
from concurrent.futures import Future
from typing import Any, Callable, Dict

import pytest

import parsl
from parsl.executors.base import ParslExecutor

# N is the number of tasks to chain
# With mid-2024 Parsl, N>140 causes Parsl to hang
N = 100

# MAX_STACK is the maximum Python stack depth allowed for either
# task submission to an executor or execution of a task.
# With mid-2024 Parsl, 2-3 stack entries will be used per
# recursively launched parsl task. So this should be smaller than
# 2*N, but big enough to allow regular pytest+parsl stuff to
# happen.
MAX_STACK = 50


def local_config():
return parsl.Config(executors=[ImmediateExecutor()])


class ImmediateExecutor(ParslExecutor):
def start(self):
pass

def shutdown(self):
pass

def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future:
stack_depth = len(inspect.stack())
assert stack_depth < MAX_STACK, "tasks should not be launched deep in the Python stack"
fut: Future[None] = Future()
res = func(*args, **kwargs)
fut.set_result(res)
return fut


@parsl.python_app
def chain(upstream):
stack_depth = len(inspect.stack())
assert stack_depth < MAX_STACK, "chained dependencies should not be launched deep in the Python stack"


@pytest.mark.local
def test_deep_dependency_stack_depth():

fut = Future()
here = fut

for _ in range(N):
here = chain(here)

fut.set_result(None)
here.result()
Loading