Skip to content

Commit

Permalink
Merge branch 'master' into benc-channels-hard-deprecate
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Aug 7, 2024
2 parents a16a846 + 10a6a00 commit 42c5a56
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def __init__(self, config: Config) -> None:
if self.monitoring:
if self.monitoring.logdir is None:
self.monitoring.logdir = self.run_dir
self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir)
self.monitoring.start(self.run_dir, self.config.run_dir)

self.time_began = datetime.datetime.now()
self.time_completed: Optional[datetime.datetime] = None
Expand Down
1 change: 1 addition & 0 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ def _start_local_interchange_process(self) -> None:
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO,
"cert_dir": self.cert_dir,
"manager_selector": self.manager_selector,
"run_id": self.run_id,
}

config_pickle = pickle.dumps(interchange_config)
Expand Down
4 changes: 4 additions & 0 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self,
poll_period: int,
cert_dir: Optional[str],
manager_selector: ManagerSelector,
run_id: str,
) -> None:
"""
Parameters
Expand Down Expand Up @@ -125,6 +126,8 @@ def __init__(self,
self.command_channel.connect("tcp://{}:{}".format(client_address, client_ports[2]))
logger.info("Connected to client")

self.run_id = run_id

self.hub_address = hub_address
self.hub_zmq_port = hub_zmq_port

Expand Down Expand Up @@ -227,6 +230,7 @@ def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender
d: Dict = cast(Dict, manager.copy())
d['timestamp'] = datetime.datetime.now()
d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat'])
d['run_id'] = self.run_id

monitoring_radio.send((MessageType.NODE_INFO, d))

Expand Down
3 changes: 1 addition & 2 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __init__(self,
self.resource_monitoring_enabled = resource_monitoring_enabled
self.resource_monitoring_interval = resource_monitoring_interval

def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:
def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:

logger.debug("Starting MonitoringHub")

Expand Down Expand Up @@ -161,7 +161,6 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
"zmq_port_range": self.hub_port_range,
"logdir": self.logdir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"run_id": run_id
},
name="Monitoring-Router-Process",
daemon=True,
Expand Down
7 changes: 1 addition & 6 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def __init__(self,

monitoring_hub_address: str = "127.0.0.1",
logdir: str = ".",
run_id: str,
logging_level: int = logging.INFO,
atexit_timeout: int = 3, # in seconds
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
Expand Down Expand Up @@ -71,7 +70,6 @@ def __init__(self,

self.hub_address = hub_address
self.atexit_timeout = atexit_timeout
self.run_id = run_id

self.loop_freq = 10.0 # milliseconds

Expand Down Expand Up @@ -172,7 +170,6 @@ def start_zmq_listener(self) -> None:
msg_0 = (msg, 0)

if msg[0] == MessageType.NODE_INFO:
msg[1]['run_id'] = self.run_id
self.node_msgs.put(msg_0)
elif msg[0] == MessageType.RESOURCE_INFO:
self.resource_msgs.put(msg_0)
Expand Down Expand Up @@ -218,16 +215,14 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
zmq_port_range: Tuple[int, int],

logdir: str,
logging_level: int,
run_id: str) -> None:
logging_level: int) -> None:
setproctitle("parsl: monitoring router")
try:
router = MonitoringRouter(hub_address=hub_address,
udp_port=udp_port,
zmq_port_range=zmq_port_range,
logdir=logdir,
logging_level=logging_level,
run_id=run_id,
priority_msgs=priority_msgs,
node_msgs=node_msgs,
block_msgs=block_msgs,
Expand Down
3 changes: 2 additions & 1 deletion parsl/tests/test_htex/test_zmq_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[s
logdir=".",
logging_level=logging.INFO,
manager_selector=RandomManagerSelector(),
poll_period=10)
poll_period=10,
run_id="test_run_id")


@pytest.fixture
Expand Down

0 comments on commit 42c5a56

Please sign in to comment.