Skip to content

Commit

Permalink
Merge PollingExecutorFacade monitoring_enabled and hub_channel (#3322)
Browse files Browse the repository at this point in the history
self.monitoring_enabled = True is equivalent to self.hub_channel being assigned.

This PR uses a more type-driven style of a single Optional for hub_channel,
removing monitoring_enabled.

This PR should not change behaviour.
  • Loading branch information
benclifford authored Apr 9, 2024
1 parent 13e8940 commit 026e014
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,19 @@ def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflo
self._status = {} # type: Dict[str, JobStatus]

# Create a ZMQ channel to send poll status to monitoring
self.monitoring_enabled = False

self.hub_channel: Optional[zmq.Socket]

if dfk and dfk.monitoring is not None:
self.monitoring_enabled = True
hub_address = dfk.hub_address
hub_port = dfk.hub_zmq_port
context = zmq.Context()
self.hub_channel = context.socket(zmq.DEALER)
self.hub_channel.set_hwm(0)
self.hub_channel.connect("tcp://{}:{}".format(hub_address, hub_port))
logger.info("Monitoring enabled on job status poller")
else:
self.hub_channel = None

def _should_poll(self, now: float) -> bool:
return now >= self._last_poll_time + self._executor.status_polling_interval
Expand All @@ -54,7 +57,7 @@ def poll(self) -> None:

def send_monitoring_info(self, status: Dict) -> None:
# Send monitoring info for HTEX when monitoring enabled
if self.monitoring_enabled:
if self.hub_channel:
msg = self._executor.create_monitoring_info(status)
logger.debug("Sending message {} to hub from job status poller".format(msg))
self.hub_channel.send_pyobj((MessageType.BLOCK_INFO, msg))
Expand Down

0 comments on commit 026e014

Please sign in to comment.