Skip to content

Commit

Permalink
Merge branch 'master' into benc-k8s-kind-ci
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Aug 5, 2024
2 parents 780fbb0 + d8e8d4b commit 2324744
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 88 deletions.
2 changes: 1 addition & 1 deletion parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
if self.monitoring:
executor.hub_address = self.monitoring.hub_address
executor.hub_zmq_port = self.monitoring.hub_zmq_port
executor.monitoring_radio = self.monitoring.radio
executor.submit_monitoring_radio = self.monitoring.radio
if hasattr(executor, 'provider'):
if hasattr(executor.provider, 'script_dir'):
executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts')
Expand Down
14 changes: 7 additions & 7 deletions parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ def __init__(
*,
hub_address: Optional[str] = None,
hub_zmq_port: Optional[int] = None,
monitoring_radio: Optional[MonitoringRadioSender] = None,
submit_monitoring_radio: Optional[MonitoringRadioSender] = None,
run_dir: str = ".",
run_id: Optional[str] = None,
):
self.hub_address = hub_address
self.hub_zmq_port = hub_zmq_port
self.monitoring_radio = monitoring_radio
self.submit_monitoring_radio = submit_monitoring_radio
self.run_dir = os.path.abspath(run_dir)
self.run_id = run_id

Expand Down Expand Up @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None:
self._hub_zmq_port = value

@property
def monitoring_radio(self) -> Optional[MonitoringRadioSender]:
def submit_monitoring_radio(self) -> Optional[MonitoringRadioSender]:
"""Local radio for sending monitoring messages
"""
return self._monitoring_radio
return self._submit_monitoring_radio

@monitoring_radio.setter
def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None:
self._monitoring_radio = value
@submit_monitoring_radio.setter
def submit_monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None:
self._submit_monitoring_radio = value
65 changes: 31 additions & 34 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.executors.high_throughput.manager_selector import ManagerSelector
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize as serialize_object
from parsl.utils import setproctitle
Expand Down Expand Up @@ -219,36 +220,27 @@ def task_puller(self) -> NoReturn:
task_counter += 1
logger.debug(f"Fetched {task_counter} tasks so far")

def _create_monitoring_channel(self) -> Optional[zmq.Socket]:
if self.hub_address and self.hub_zmq_port:
logger.info("Connecting to MonitoringHub")
# This is a one-off because monitoring is unencrypted
hub_channel = zmq.Context().socket(zmq.DEALER)
hub_channel.set_hwm(0)
hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_zmq_port))
logger.info("Connected to MonitoringHub")
return hub_channel
else:
return None

def _send_monitoring_info(self, hub_channel: Optional[zmq.Socket], manager: ManagerRecord) -> None:
if hub_channel:
def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None:
if monitoring_radio:
logger.info("Sending message {} to MonitoringHub".format(manager))

d: Dict = cast(Dict, manager.copy())
d['timestamp'] = datetime.datetime.now()
d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat'])

hub_channel.send_pyobj((MessageType.NODE_INFO, d))
monitoring_radio.send((MessageType.NODE_INFO, d))

@wrap_with_logs(target="interchange")
def _command_server(self) -> NoReturn:
""" Command server to run async command to the interchange
"""
logger.debug("Command Server Starting")

# Need to create a new ZMQ socket for command server thread
hub_channel = self._create_monitoring_channel()
if self.hub_address is not None and self.hub_zmq_port is not None:
logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port)
monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
else:
monitoring_radio = None

reply: Any # the type of reply depends on the command_req received (aka this needs dependent types...)

Expand Down Expand Up @@ -298,7 +290,7 @@ def _command_server(self) -> NoReturn:
if manager_id in self._ready_managers:
m = self._ready_managers[manager_id]
m['active'] = False
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)
else:
logger.warning("Worker to hold was not in ready managers list")

Expand Down Expand Up @@ -333,9 +325,14 @@ def start(self) -> None:
# parent-process-inheritance problems.
signal.signal(signal.SIGTERM, signal.SIG_DFL)

logger.info("Incoming ports bound")
logger.info("Starting main interchange method")

hub_channel = self._create_monitoring_channel()
if self.hub_address is not None and self.hub_zmq_port is not None:
logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port)
monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
logger.debug("Created monitoring radio")
else:
monitoring_radio = None

poll_period = self.poll_period

Expand Down Expand Up @@ -366,10 +363,10 @@ def start(self) -> None:
while not kill_event.is_set():
self.socks = dict(poller.poll(timeout=poll_period))

self.process_task_outgoing_incoming(interesting_managers, hub_channel, kill_event)
self.process_results_incoming(interesting_managers, hub_channel)
self.expire_bad_managers(interesting_managers, hub_channel)
self.expire_drained_managers(interesting_managers, hub_channel)
self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event)
self.process_results_incoming(interesting_managers, monitoring_radio)
self.expire_bad_managers(interesting_managers, monitoring_radio)
self.expire_drained_managers(interesting_managers, monitoring_radio)
self.process_tasks_to_send(interesting_managers)

self.zmq_context.destroy()
Expand All @@ -380,7 +377,7 @@ def start(self) -> None:
def process_task_outgoing_incoming(
self,
interesting_managers: Set[bytes],
hub_channel: Optional[zmq.Socket],
monitoring_radio: Optional[MonitoringRadioSender],
kill_event: threading.Event
) -> None:
"""Process one message from manager on the task_outgoing channel.
Expand Down Expand Up @@ -434,7 +431,7 @@ def process_task_outgoing_incoming(
m.update(msg) # type: ignore[typeddict-item]

logger.info("Registration info for manager {!r}: {}".format(manager_id, msg))
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)

if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or
msg['parsl_v'] != self.current_platform['parsl_v']):
Expand Down Expand Up @@ -465,7 +462,7 @@ def process_task_outgoing_incoming(
logger.error(f"Unexpected message type received from manager: {msg['type']}")
logger.debug("leaving task_outgoing section")

def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:

for manager_id in list(interesting_managers):
# is it always true that a draining manager will be in interesting managers?
Expand All @@ -478,7 +475,7 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel:
self._ready_managers.pop(manager_id)

m['active'] = False
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)

def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
# Check if there are tasks that could be sent to managers
Expand Down Expand Up @@ -521,7 +518,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
else:
logger.debug("either no interesting managers or no tasks, so skipping manager pass")

def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
# Receive any results and forward to client
if self.results_incoming in self.socks and self.socks[self.results_incoming] == zmq.POLLIN:
logger.debug("entering results_incoming section")
Expand All @@ -541,11 +538,11 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel
elif r['type'] == 'monitoring':
# the monitoring code makes the assumption that no
# monitoring messages will be received if monitoring
# is not configured, and that hub_channel will only
# is not configured, and that monitoring_radio will only
# be None when monitoring is not configurated.
assert hub_channel is not None
assert monitoring_radio is not None

hub_channel.send_pyobj(r['payload'])
monitoring_radio.send(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
b_messages.append((p_message, r))
Expand Down Expand Up @@ -589,15 +586,15 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel
interesting_managers.add(manager_id)
logger.debug("leaving results_incoming section")

def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None:
def expire_bad_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if
time.time() - m['last_heartbeat'] > self.heartbeat_threshold]
for (manager_id, m) in bad_managers:
logger.debug("Last: {} Current: {}".format(m['last_heartbeat'], time.time()))
logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager")
if m['active']:
m['active'] = False
self._send_monitoring_info(hub_channel, m)
self._send_monitoring_info(monitoring_radio, m)

logger.warning(f"Cancelling htex tasks {m['tasks']} on removed manager")
for tid in m['tasks']:
Expand Down
51 changes: 29 additions & 22 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from parsl.executors.base import ParslExecutor
from parsl.executors.errors import BadStateException, ScalingFailed
from parsl.jobs.error_handlers import noop_error_handler, simple_error_handler
from parsl.jobs.states import JobState, JobStatus
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
from parsl.monitoring.message_type import MessageType
from parsl.providers.base import ExecutionProvider
from parsl.utils import AtomicIDCounter
Expand Down Expand Up @@ -183,31 +183,34 @@ def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) -
return list(compress(to_kill, killed))

def scale_out_facade(self, n: int) -> List[str]:
block_ids = self._scale_out(n)
new_status = {}
for block_id in block_ids:
new_status[block_id] = JobStatus(JobState.PENDING)
self.send_monitoring_info(new_status)
self._status.update(new_status)
return block_ids

def _scale_out(self, blocks: int = 1) -> List[str]:
"""Scales out the number of blocks by "blocks"
"""
if not self.provider:
raise ScalingFailed(self, "No execution provider available")
block_ids = []
logger.info(f"Scaling out by {blocks} blocks")
for _ in range(blocks):
monitoring_status_changes = {}
logger.info(f"Scaling out by {n} blocks")
for _ in range(n):
block_id = str(self._block_id_counter.get_id())
logger.info(f"Allocated block ID {block_id}")
try:
job_id = self._launch_block(block_id)

pending_status = JobStatus(JobState.PENDING)

self.blocks_to_job_id[block_id] = job_id
self.job_ids_to_block[job_id] = block_id
self._status[block_id] = pending_status

monitoring_status_changes[block_id] = pending_status
block_ids.append(block_id)

except Exception as ex:
self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex))
failed_status = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex))
self._simulated_status[block_id] = failed_status
self._status[block_id] = failed_status

self.send_monitoring_info(monitoring_status_changes)
return block_ids

def scale_in(self, blocks: int) -> List[str]:
Expand All @@ -222,16 +225,20 @@ def scale_in(self, blocks: int) -> List[str]:
:return: A list of block ids corresponding to the blocks that were removed.
"""
# Obtain list of blocks to kill
to_kill = list(self.blocks_to_job_id.keys())[:blocks]
kill_ids = [self.blocks_to_job_id[block] for block in to_kill]

active_blocks = [block_id for block_id, status in self._status.items()
if status.state not in TERMINAL_STATES]

block_ids_to_kill = active_blocks[:blocks]

job_ids_to_kill = [self.blocks_to_job_id[block] for block in block_ids_to_kill]

# Cancel the blocks provisioned
if self.provider:
logger.info(f"Scaling in jobs: {kill_ids}")
r = self.provider.cancel(kill_ids)
job_ids = self._filter_scale_in_ids(kill_ids, r)
block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids]
logger.info(f"Scaling in jobs: {job_ids_to_kill}")
r = self.provider.cancel(job_ids_to_kill)
job_ids = self._filter_scale_in_ids(job_ids_to_kill, r)
block_ids_killed = [self.job_ids_to_block[job_id] for job_id in job_ids]
return block_ids_killed
else:
logger.error("No execution provider available to scale in")
Expand Down Expand Up @@ -269,10 +276,10 @@ def workers_per_node(self) -> Union[int, float]:

def send_monitoring_info(self, status: Dict) -> None:
# Send monitoring info for HTEX when monitoring enabled
if self.monitoring_radio:
if self.submit_monitoring_radio:
msg = self.create_monitoring_info(status)
logger.debug("Sending block monitoring message: %r", msg)
self.monitoring_radio.send((MessageType.BLOCK_INFO, msg))
self.submit_monitoring_radio.send((MessageType.BLOCK_INFO, msg))

def create_monitoring_info(self, status: Dict[str, JobStatus]) -> Sequence[object]:
"""Create a monitoring message for each block based on the poll status.
Expand Down
6 changes: 6 additions & 0 deletions parsl/monitoring/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from parsl.errors import ParslError


class MonitoringHubStartError(ParslError):
def __str__(self) -> str:
return "Hub failed to start"
3 changes: 2 additions & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import typeguard

from parsl.log_utils import set_file_logger
from parsl.monitoring.errors import MonitoringHubStartError
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.router import router_starter
Expand Down Expand Up @@ -195,7 +196,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
comm_q.join_thread()
except queue.Empty:
logger.error("Hub has not completed initialization in 120s. Aborting")
raise Exception("Hub failed to start")
raise MonitoringHubStartError()

if isinstance(comm_q_result, str):
logger.error(f"MonitoringRouter sent an error message: {comm_q_result}")
Expand Down
16 changes: 16 additions & 0 deletions parsl/monitoring/radios.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from multiprocessing.queues import Queue
from typing import Optional

import zmq

from parsl.serialize import serialize

_db_manager_excepts: Optional[Exception]
Expand Down Expand Up @@ -186,3 +188,17 @@ def __init__(self, queue: Queue) -> None:

def send(self, message: object) -> None:
self.queue.put((message, 0))


class ZMQRadioSender(MonitoringRadioSender):
"""A monitoring radio which connects over ZMQ. This radio is not
thread-safe, because its use of ZMQ is not thread-safe.
"""

def __init__(self, hub_address: str, hub_zmq_port: int) -> None:
self._hub_channel = zmq.Context().socket(zmq.DEALER)
self._hub_channel.set_hwm(0)
self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}")

def send(self, message: object) -> None:
self._hub_channel.send_pyobj(message)
Loading

0 comments on commit 2324744

Please sign in to comment.