Skip to content

Commit

Permalink
add some process monitoring fields at request of david adams
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Nov 13, 2023
1 parent 1ab0109 commit eca9b9d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 4 deletions.
1 change: 1 addition & 0 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class WorkQueueExecutor(BlockProviderExecutor, putils.RepresentationMixin):
# TODO: this should be configurable: there's no definite preference for
# results radio vs filesystem mode.
# radio_mode = "results"
# radio_mode = "filesystem"

@typeguard.typechecked
def __init__(self,
Expand Down
6 changes: 6 additions & 0 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ class Resource(Base):
'psutil_process_disk_write', Float, nullable=True)
psutil_process_status = Column(
'psutil_process_status', Text, nullable=True)
psutil_cpu_num = Column(
'psutil_cpu_num', Text, nullable=True)
psutil_process_num_ctx_switches_voluntary = Column(
'psutil_process_num_ctx_switches_voluntary', Float, nullable=True)
psutil_process_num_ctx_switches_involuntary = Column(
'psutil_process_num_ctx_switches_involuntary', Float, nullable=True)
__table_args__ = (
PrimaryKeyConstraint('try_id', 'task_id', 'run_id', 'timestamp'),
)
Expand Down
42 changes: 38 additions & 4 deletions parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ def monitor(pid: int,

children_user_time = {} # type: Dict[int, float]
children_system_time = {} # type: Dict[int, float]
children_num_ctx_switches_voluntary = {} # type: Dict[int, float]
children_num_ctx_switches_involuntary = {} # type: Dict[int, float]

def accumulate_and_prepare() -> Dict[str, Any]:
d = {"psutil_process_" + str(k): v for k, v in pm.as_dict().items() if k in simple}
Expand All @@ -274,6 +276,19 @@ def accumulate_and_prepare() -> Dict[str, Any]:
logging.debug("got children")

d["psutil_cpu_count"] = psutil.cpu_count()

# note that this will be the CPU of the base process, not anything launched by it
d["psutil_cpu_num"] = pm.cpu_num()

# TODO: this is a structured tuple, so maybe it should be broken up, and the
# fields that make sense summed over children. This is a str() here because
# on the receiving side, sqalchemy can't turn a tuple into a text sql field...

pctxsw = pm.num_ctx_switches()

d["psutil_process_num_ctx_switches_voluntary"] = pctxsw.voluntary
d["psutil_process_num_ctx_switches_involuntary"] = pctxsw.involuntary

d['psutil_process_memory_virtual'] = pm.memory_info().vms
d['psutil_process_memory_resident'] = pm.memory_info().rss
d['psutil_process_time_user'] = pm.cpu_times().user
Expand All @@ -294,6 +309,11 @@ def accumulate_and_prepare() -> Dict[str, Any]:
child_system_time = child.cpu_times().system
children_user_time[child.pid] = child_user_time
children_system_time[child.pid] = child_system_time

pctxsw = child.num_ctx_switches()
children_num_ctx_switches_voluntary[child.pid] = pctxsw.voluntary
children_num_ctx_switches_involuntary[child.pid] = pctxsw.involuntary

d['psutil_process_memory_virtual'] += child.memory_info().vms
d['psutil_process_memory_resident'] += child.memory_info().rss
try:
Expand All @@ -304,15 +324,28 @@ def accumulate_and_prepare() -> Dict[str, Any]:
logging.exception("Exception reading IO counters for child {k}. Recorded IO usage may be incomplete".format(k=k), exc_info=True)
d['psutil_process_disk_write'] += 0
d['psutil_process_disk_read'] += 0

total_children_user_time = 0.0
for child_pid in children_user_time:
total_children_user_time += children_user_time[child_pid]

total_children_system_time = 0.0
for child_pid in children_system_time:
total_children_system_time += children_system_time[child_pid]

total_children_num_ctx_switches_voluntary = 0.0
for child_pid in children_num_ctx_switches_voluntary:
total_children_num_ctx_switches_voluntary += children_num_ctx_switches_voluntary[child_pid]

total_children_num_ctx_switches_involuntary = 0.0
for child_pid in children_num_ctx_switches_involuntary:
total_children_num_ctx_switches_involuntary += children_num_ctx_switches_involuntary[child_pid]

d['psutil_process_time_user'] += total_children_user_time
d['psutil_process_time_system'] += total_children_system_time
logging.debug("sending message")
d['psutil_process_num_ctx_switches_voluntary'] += total_children_num_ctx_switches_voluntary
d['psutil_process_num_ctx_switches_involuntary'] += total_children_num_ctx_switches_involuntary
logging.debug("returning dict from accumulate and prepare")
return d

next_send = time.time()
Expand All @@ -323,7 +356,7 @@ def accumulate_and_prepare() -> Dict[str, Any]:
try:
d = accumulate_and_prepare()
if time.time() >= next_send:
logging.debug("Sending intermediate resource message")
logging.debug(f"Sending intermediate resource message: {d}")
radio.send((MessageType.RESOURCE_INFO, d))
next_send += sleep_dur
except Exception:
Expand All @@ -337,9 +370,10 @@ def accumulate_and_prepare() -> Dict[str, Any]:

terminate_event.wait(max(0, min(next_send - time.time(), accumulate_dur)))

logging.debug("Sending final resource message")
try:
logging.debug("Preparing final resource message")
d = accumulate_and_prepare()
logging.debug(f"Sending final resource message: {d}")
radio.send((MessageType.RESOURCE_INFO, d))
except Exception:
logging.exception("Exception getting the resource usage. Not sending final usage to Hub", exc_info=True)
Expand All @@ -351,4 +385,4 @@ def accumulate_and_prepare() -> Dict[str, Any]:
logging.debug("Sending result_radio_queue")
terminate_queue.put(result_radio_queue)

logging.debug("End of monitoring helper")
logging.info("End of monitoring helper")
1 change: 1 addition & 0 deletions parsl/tests/configs/workqueue_ex.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ def fresh_config():
monitoring=MonitoringHub(hub_address="localhost",
hub_port=55055,
monitoring_debug=True,
resource_monitoring_enabled=True,
resource_monitoring_interval=1,
))

0 comments on commit eca9b9d

Please sign in to comment.