Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into benc-monitoring-int…
Browse files Browse the repository at this point in the history
…erchange-radio
  • Loading branch information
benclifford committed Aug 2, 2024
2 parents 21dd3aa + 2b01411 commit bf14d49
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 13 deletions.
27 changes: 15 additions & 12 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,31 +183,34 @@ def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) -
return list(compress(to_kill, killed))

def scale_out_facade(self, n: int) -> List[str]:
block_ids = self._scale_out(n)
new_status = {}
for block_id in block_ids:
new_status[block_id] = JobStatus(JobState.PENDING)
self.send_monitoring_info(new_status)
self._status.update(new_status)
return block_ids

def _scale_out(self, blocks: int = 1) -> List[str]:
"""Scales out the number of blocks by "blocks"
"""
if not self.provider:
raise ScalingFailed(self, "No execution provider available")
block_ids = []
logger.info(f"Scaling out by {blocks} blocks")
for _ in range(blocks):
monitoring_status_changes = {}
logger.info(f"Scaling out by {n} blocks")
for _ in range(n):
block_id = str(self._block_id_counter.get_id())
logger.info(f"Allocated block ID {block_id}")
try:
job_id = self._launch_block(block_id)

pending_status = JobStatus(JobState.PENDING)

self.blocks_to_job_id[block_id] = job_id
self.job_ids_to_block[job_id] = block_id
self._status[block_id] = pending_status

monitoring_status_changes[block_id] = pending_status
block_ids.append(block_id)

except Exception as ex:
self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex))
failed_status = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex))
self._simulated_status[block_id] = failed_status
self._status[block_id] = failed_status

self.send_monitoring_info(monitoring_status_changes)
return block_ids

def scale_in(self, blocks: int) -> List[str]:
Expand Down
6 changes: 6 additions & 0 deletions parsl/monitoring/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from parsl.errors import ParslError


class MonitoringHubStartError(ParslError):
def __str__(self) -> str:
return "Hub failed to start"
3 changes: 2 additions & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import typeguard

from parsl.log_utils import set_file_logger
from parsl.monitoring.errors import MonitoringHubStartError
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.router import router_starter
Expand Down Expand Up @@ -195,7 +196,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
comm_q.join_thread()
except queue.Empty:
logger.error("Hub has not completed initialization in 120s. Aborting")
raise Exception("Hub failed to start")
raise MonitoringHubStartError()

if isinstance(comm_q_result, str):
logger.error(f"MonitoringRouter sent an error message: {comm_q_result}")
Expand Down
71 changes: 71 additions & 0 deletions parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import logging

import pytest

import parsl
from parsl import Config
from parsl.executors import HighThroughputExecutor
from parsl.executors.errors import BadStateException
from parsl.jobs.states import JobState, JobStatus
from parsl.providers import LocalProvider


class FailingProvider(LocalProvider):
def submit(*args, **kwargs):
raise RuntimeError("Deliberate failure of provider.submit")


def local_config():
"""Config to simulate failing blocks without connecting"""
return Config(
executors=[
HighThroughputExecutor(
label="HTEX",
heartbeat_period=1,
heartbeat_threshold=2,
poll_period=100,
max_workers_per_node=1,
provider=FailingProvider(
init_blocks=0,
max_blocks=2,
min_blocks=0,
),
)
],
max_idletime=0.5,
strategy='htex_auto_scale',
strategy_period=0.1
# this strategy period needs to be a few times smaller than the
# status_polling_interval of FailingProvider, which is 5s at
# time of writing
)


@parsl.python_app
def double(x):
return x * 2


@pytest.mark.local
def test_disconnected_blocks():
"""Test reporting of blocks that fail to connect from HTEX"""
dfk = parsl.dfk()
executor = dfk.executors["HTEX"]

connected_blocks = executor.connected_blocks()
assert not connected_blocks, "Expected 0 blocks"

future = double(5)
with pytest.raises(BadStateException):
future.result()

assert isinstance(future.exception(), BadStateException)

status_dict = executor.status()
assert len(status_dict) == 1, "Expected exactly 1 block"
for status in status_dict.values():
assert isinstance(status, JobStatus)
assert status.state == JobState.MISSING

connected_blocks = executor.connected_blocks()
assert connected_blocks == [], "Expected exactly 0 connected blocks"

0 comments on commit bf14d49

Please sign in to comment.