Skip to content

Commit

Permalink
Merge branch 'master' into benc-monitoring-test-udp
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Aug 1, 2024
2 parents 59aa7aa + 5ee584d commit cb50c62
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 43 deletions.
2 changes: 1 addition & 1 deletion parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
if self.monitoring:
executor.hub_address = self.monitoring.hub_address
executor.hub_zmq_port = self.monitoring.hub_zmq_port
executor.monitoring_radio = self.monitoring.radio
executor.submit_monitoring_radio = self.monitoring.radio
if hasattr(executor, 'provider'):
if hasattr(executor.provider, 'script_dir'):
executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts')
Expand Down
14 changes: 7 additions & 7 deletions parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ def __init__(
*,
hub_address: Optional[str] = None,
hub_zmq_port: Optional[int] = None,
monitoring_radio: Optional[MonitoringRadioSender] = None,
submit_monitoring_radio: Optional[MonitoringRadioSender] = None,
run_dir: str = ".",
run_id: Optional[str] = None,
):
self.hub_address = hub_address
self.hub_zmq_port = hub_zmq_port
self.monitoring_radio = monitoring_radio
self.submit_monitoring_radio = submit_monitoring_radio
self.run_dir = os.path.abspath(run_dir)
self.run_id = run_id

Expand Down Expand Up @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None:
self._hub_zmq_port = value

@property
def monitoring_radio(self) -> Optional[MonitoringRadioSender]:
def submit_monitoring_radio(self) -> Optional[MonitoringRadioSender]:
"""Local radio for sending monitoring messages
"""
return self._monitoring_radio
return self._submit_monitoring_radio

@monitoring_radio.setter
def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None:
self._monitoring_radio = value
@submit_monitoring_radio.setter
def submit_monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None:
self._submit_monitoring_radio = value
34 changes: 23 additions & 11 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from parsl.executors.base import ParslExecutor
from parsl.executors.errors import BadStateException, ScalingFailed
from parsl.jobs.error_handlers import noop_error_handler, simple_error_handler
from parsl.jobs.states import JobState, JobStatus
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
from parsl.monitoring.message_type import MessageType
from parsl.providers.base import ExecutionProvider
from parsl.utils import AtomicIDCounter
Expand Down Expand Up @@ -167,10 +167,18 @@ def tasks(self) -> Dict[object, Future]:
def provider(self):
return self._provider

def _filter_scale_in_ids(self, to_kill, killed):
def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) -> Sequence[Any]:
""" Filter out job id's that were not killed
"""
assert len(to_kill) == len(killed)

if False in killed:
killed_job_ids = [jid for jid, k in zip(to_kill, killed) if k]
not_killed_job_ids = [jid for jid, k in zip(to_kill, killed) if not k]
logger.warning("Some jobs were not killed successfully: "
f"killed jobs: {killed_job_ids}, "
f"not-killed jobs: {not_killed_job_ids}")

# Filters first iterable by bool values in second
return list(compress(to_kill, killed))

Expand Down Expand Up @@ -214,16 +222,20 @@ def scale_in(self, blocks: int) -> List[str]:
:return: A list of block ids corresponding to the blocks that were removed.
"""
# Obtain list of blocks to kill
to_kill = list(self.blocks_to_job_id.keys())[:blocks]
kill_ids = [self.blocks_to_job_id[block] for block in to_kill]

active_blocks = [block_id for block_id, status in self._status.items()
if status.state not in TERMINAL_STATES]

block_ids_to_kill = active_blocks[:blocks]

job_ids_to_kill = [self.blocks_to_job_id[block] for block in block_ids_to_kill]

# Cancel the blocks provisioned
if self.provider:
logger.info(f"Scaling in jobs: {kill_ids}")
r = self.provider.cancel(kill_ids)
job_ids = self._filter_scale_in_ids(kill_ids, r)
block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids]
logger.info(f"Scaling in jobs: {job_ids_to_kill}")
r = self.provider.cancel(job_ids_to_kill)
job_ids = self._filter_scale_in_ids(job_ids_to_kill, r)
block_ids_killed = [self.job_ids_to_block[job_id] for job_id in job_ids]
return block_ids_killed
else:
logger.error("No execution provider available to scale in")
Expand Down Expand Up @@ -261,10 +273,10 @@ def workers_per_node(self) -> Union[int, float]:

def send_monitoring_info(self, status: Dict) -> None:
# Send monitoring info for HTEX when monitoring enabled
if self.monitoring_radio:
if self.submit_monitoring_radio:
msg = self.create_monitoring_info(status)
logger.debug("Sending block monitoring message: %r", msg)
self.monitoring_radio.send((MessageType.BLOCK_INFO, msg))
self.submit_monitoring_radio.send((MessageType.BLOCK_INFO, msg))

def create_monitoring_info(self, status: Dict[str, JobStatus]) -> Sequence[object]:
"""Create a monitoring message for each block based on the poll status.
Expand Down
49 changes: 32 additions & 17 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ def __init__(self,
logdir: str = ".",
run_id: str,
logging_level: int = logging.INFO,
atexit_timeout: int = 3 # in seconds
atexit_timeout: int = 3, # in seconds
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
exit_event: Event,
):
""" Initializes a monitoring configuration class.
Expand All @@ -51,7 +56,11 @@ def __init__(self,
Logging level as defined in the logging module. Default: logging.INFO
atexit_timeout : float, optional
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
*_msgs : Queue
Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag.
exit_event : Event
An event that the main Parsl process will set to signal that the monitoring router should shut down.
"""
os.makedirs(logdir, exist_ok=True)
self.logger = set_file_logger("{}/monitoring_router.log".format(logdir),
Expand Down Expand Up @@ -93,19 +102,20 @@ def __init__(self,
min_port=zmq_port_range[0],
max_port=zmq_port_range[1])

def start(self,
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
exit_event: Event) -> None:
self.priority_msgs = priority_msgs
self.node_msgs = node_msgs
self.block_msgs = block_msgs
self.resource_msgs = resource_msgs
self.exit_event = exit_event

def start(self) -> None:
try:
while not exit_event.is_set():
while not self.exit_event.is_set():
try:
data, addr = self.udp_sock.recvfrom(2048)
resource_msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg))
resource_msgs.put((resource_msg, addr))
self.resource_msgs.put((resource_msg, addr))
except socket.timeout:
pass

Expand All @@ -125,15 +135,15 @@ def start(self,

if msg[0] == MessageType.NODE_INFO:
msg[1]['run_id'] = self.run_id
node_msgs.put(msg_0)
self.node_msgs.put(msg_0)
elif msg[0] == MessageType.RESOURCE_INFO:
resource_msgs.put(msg_0)
self.resource_msgs.put(msg_0)
elif msg[0] == MessageType.BLOCK_INFO:
block_msgs.put(msg_0)
self.block_msgs.put(msg_0)
elif msg[0] == MessageType.TASK_INFO:
priority_msgs.put(msg_0)
self.priority_msgs.put(msg_0)
elif msg[0] == MessageType.WORKFLOW_INFO:
priority_msgs.put(msg_0)
self.priority_msgs.put(msg_0)
else:
# There is a type: ignore here because if msg[0]
# is of the correct type, this code is unreachable,
Expand All @@ -158,7 +168,7 @@ def start(self,
data, addr = self.udp_sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
resource_msgs.put((msg, addr))
self.resource_msgs.put((msg, addr))
last_msg_received_time = time.time()
except socket.timeout:
pass
Expand Down Expand Up @@ -191,7 +201,12 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
zmq_port_range=zmq_port_range,
logdir=logdir,
logging_level=logging_level,
run_id=run_id)
run_id=run_id,
priority_msgs=priority_msgs,
node_msgs=node_msgs,
block_msgs=block_msgs,
resource_msgs=resource_msgs,
exit_event=exit_event)
except Exception as e:
logger.error("MonitoringRouter construction failed.", exc_info=True)
comm_q.put(f"Monitoring router construction failed: {e}")
Expand All @@ -200,7 +215,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",

router.logger.info("Starting MonitoringRouter in router_starter")
try:
router.start(priority_msgs, node_msgs, block_msgs, resource_msgs, exit_event)
router.start()
except Exception as e:
router.logger.exception("router.start exception")
exception_q.put(('Hub', str(e)))
12 changes: 5 additions & 7 deletions parsl/tests/test_htex/test_htex.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ def test_htex_start_encrypted(
@pytest.mark.local
@pytest.mark.parametrize("started", (True, False))
@pytest.mark.parametrize("timeout_expires", (True, False))
@mock.patch(f"{_MOCK_BASE}.logger")
def test_htex_shutdown(
mock_logger: mock.MagicMock,
started: bool,
timeout_expires: bool,
htex: HighThroughputExecutor,
caplog
):
mock_ix_proc = mock.Mock(spec=Popen)

Expand Down Expand Up @@ -110,20 +109,19 @@ def kill_interchange(*args, **kwargs):

htex.shutdown()

mock_logs = mock_logger.info.call_args_list
if started:
assert mock_ix_proc.terminate.called
assert mock_ix_proc.wait.called
assert {"timeout": 10} == mock_ix_proc.wait.call_args[1]
if timeout_expires:
assert "Unable to terminate Interchange" in mock_logs[1][0][0]
assert "Unable to terminate Interchange" in caplog.text
assert mock_ix_proc.kill.called
assert "Attempting" in mock_logs[0][0][0]
assert "Finished" in mock_logs[-1][0][0]
assert "Attempting HighThroughputExecutor shutdown" in caplog.text
assert "Finished HighThroughputExecutor shutdown" in caplog.text
else:
assert not mock_ix_proc.terminate.called
assert not mock_ix_proc.wait.called
assert "has not started" in mock_logs[0][0][0]
assert "HighThroughputExecutor has not started" in caplog.text


@pytest.mark.local
Expand Down

0 comments on commit cb50c62

Please sign in to comment.