diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 23afd37749..d65388e7cb 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -220,6 +220,7 @@ class WorkQueueExecutor(BlockProviderExecutor, putils.RepresentationMixin): # TODO: this should be configurable: there's no definite preference for # results radio vs filesystem mode. # radio_mode = "results" + # radio_mode = "filesystem" @typeguard.typechecked def __init__(self, diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 2a8e0733e2..16ad050def 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -250,6 +250,12 @@ class Resource(Base): 'psutil_process_disk_write', Float, nullable=True) psutil_process_status = Column( 'psutil_process_status', Text, nullable=True) + psutil_cpu_num = Column( + 'psutil_cpu_num', Text, nullable=True) + psutil_process_num_ctx_switches_voluntary = Column( + 'psutil_process_num_ctx_switches_voluntary', Float, nullable=True) + psutil_process_num_ctx_switches_involuntary = Column( + 'psutil_process_num_ctx_switches_involuntary', Float, nullable=True) __table_args__ = ( PrimaryKeyConstraint('try_id', 'task_id', 'run_id', 'timestamp'), ) diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index f925b1bb25..e367d3d5c7 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -257,6 +257,8 @@ def monitor(pid: int, children_user_time = {} # type: Dict[int, float] children_system_time = {} # type: Dict[int, float] + children_num_ctx_switches_voluntary = {} # type: Dict[int, float] + children_num_ctx_switches_involuntary = {} # type: Dict[int, float] def accumulate_and_prepare() -> Dict[str, Any]: d = {"psutil_process_" + str(k): v for k, v in pm.as_dict().items() if k in simple} @@ -274,6 +276,19 @@ def accumulate_and_prepare() -> Dict[str, Any]: logging.debug("got children") d["psutil_cpu_count"] = psutil.cpu_count() + + # note that this will be the CPU of the base process, not anything launched by it + d["psutil_cpu_num"] = pm.cpu_num() + + # TODO: this is a structured tuple, so maybe it should be broken up, and the + # fields that make sense summed over children. This is a str() here because + # on the receiving side, sqalchemy can't turn a tuple into a text sql field... + + pctxsw = pm.num_ctx_switches() + + d["psutil_process_num_ctx_switches_voluntary"] = pctxsw.voluntary + d["psutil_process_num_ctx_switches_involuntary"] = pctxsw.involuntary + d['psutil_process_memory_virtual'] = pm.memory_info().vms d['psutil_process_memory_resident'] = pm.memory_info().rss d['psutil_process_time_user'] = pm.cpu_times().user @@ -294,6 +309,11 @@ def accumulate_and_prepare() -> Dict[str, Any]: child_system_time = child.cpu_times().system children_user_time[child.pid] = child_user_time children_system_time[child.pid] = child_system_time + + pctxsw = child.num_ctx_switches() + children_num_ctx_switches_voluntary[child.pid] = pctxsw.voluntary + children_num_ctx_switches_involuntary[child.pid] = pctxsw.involuntary + d['psutil_process_memory_virtual'] += child.memory_info().vms d['psutil_process_memory_resident'] += child.memory_info().rss try: @@ -304,15 +324,28 @@ def accumulate_and_prepare() -> Dict[str, Any]: logging.exception("Exception reading IO counters for child {k}. Recorded IO usage may be incomplete".format(k=k), exc_info=True) d['psutil_process_disk_write'] += 0 d['psutil_process_disk_read'] += 0 + total_children_user_time = 0.0 for child_pid in children_user_time: total_children_user_time += children_user_time[child_pid] + total_children_system_time = 0.0 for child_pid in children_system_time: total_children_system_time += children_system_time[child_pid] + + total_children_num_ctx_switches_voluntary = 0.0 + for child_pid in children_num_ctx_switches_voluntary: + total_children_num_ctx_switches_voluntary += children_num_ctx_switches_voluntary[child_pid] + + total_children_num_ctx_switches_involuntary = 0.0 + for child_pid in children_num_ctx_switches_involuntary: + total_children_num_ctx_switches_involuntary += children_num_ctx_switches_involuntary[child_pid] + d['psutil_process_time_user'] += total_children_user_time d['psutil_process_time_system'] += total_children_system_time - logging.debug("sending message") + d['psutil_process_num_ctx_switches_voluntary'] += total_children_num_ctx_switches_voluntary + d['psutil_process_num_ctx_switches_involuntary'] += total_children_num_ctx_switches_involuntary + logging.debug("returning dict from accumulate and prepare") return d next_send = time.time() @@ -323,7 +356,7 @@ def accumulate_and_prepare() -> Dict[str, Any]: try: d = accumulate_and_prepare() if time.time() >= next_send: - logging.debug("Sending intermediate resource message") + logging.debug(f"Sending intermediate resource message: {d}") radio.send((MessageType.RESOURCE_INFO, d)) next_send += sleep_dur except Exception: @@ -337,9 +370,10 @@ def accumulate_and_prepare() -> Dict[str, Any]: terminate_event.wait(max(0, min(next_send - time.time(), accumulate_dur))) - logging.debug("Sending final resource message") try: + logging.debug("Preparing final resource message") d = accumulate_and_prepare() + logging.debug(f"Sending final resource message: {d}") radio.send((MessageType.RESOURCE_INFO, d)) except Exception: logging.exception("Exception getting the resource usage. Not sending final usage to Hub", exc_info=True) @@ -351,4 +385,4 @@ def accumulate_and_prepare() -> Dict[str, Any]: logging.debug("Sending result_radio_queue") terminate_queue.put(result_radio_queue) - logging.debug("End of monitoring helper") + logging.info("End of monitoring helper") diff --git a/parsl/tests/configs/workqueue_ex.py b/parsl/tests/configs/workqueue_ex.py index 062ae95d65..1b9957381e 100644 --- a/parsl/tests/configs/workqueue_ex.py +++ b/parsl/tests/configs/workqueue_ex.py @@ -17,5 +17,6 @@ def fresh_config(): monitoring=MonitoringHub(hub_address="localhost", hub_port=55055, monitoring_debug=True, + resource_monitoring_enabled=True, resource_monitoring_interval=1, ))