Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-functional change: minor log call-site updates #3597

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 31 additions & 29 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def start(self) -> None:

self.zmq_context.destroy()
delta = time.time() - start
logger.info("Processed {} tasks in {} seconds".format(self.count, delta))
logger.info(f"Processed {self.count} tasks in {delta} seconds")
logger.warning("Exiting")

def process_task_outgoing_incoming(
Expand All @@ -396,17 +396,16 @@ def process_task_outgoing_incoming(
try:
msg = json.loads(message[1].decode('utf-8'))
except Exception:
logger.warning("Got Exception reading message from manager: {!r}".format(
manager_id), exc_info=True)
logger.debug("Message: \n{!r}\n".format(message[1]))
logger.warning(f"Got Exception reading message from manager: {manager_id!r}", exc_info=True)
logger.debug("Message:\n %r\n", message[1])
return

# perform a bit of validation on the structure of the deserialized
# object, at least enough to behave like a deserialization error
# in obviously malformed cases
if not isinstance(msg, dict) or 'type' not in msg:
logger.error(f"JSON message was not correctly formatted from manager: {manager_id!r}")
logger.debug("Message: \n{!r}\n".format(message[1]))
logger.debug("Message:\n %r\n", message[1])
return

if msg['type'] == 'registration':
Expand All @@ -425,7 +424,7 @@ def process_task_outgoing_incoming(
self.connected_block_history.append(msg['block_id'])

interesting_managers.add(manager_id)
logger.info("Adding manager: {!r} to ready queue".format(manager_id))
logger.info(f"Adding manager: {manager_id!r} to ready queue")
m = self._ready_managers[manager_id]

# m is a ManagerRecord, but msg is a dict[Any,Any] and so can
Expand All @@ -434,12 +433,12 @@ def process_task_outgoing_incoming(
# later.
m.update(msg) # type: ignore[typeddict-item]

logger.info("Registration info for manager {!r}: {}".format(manager_id, msg))
logger.info(f"Registration info for manager {manager_id!r}: {msg}")
self._send_monitoring_info(monitoring_radio, m)

if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or
msg['parsl_v'] != self.current_platform['parsl_v']):
logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id))
logger.error(f"Manager {manager_id!r} has incompatible version info with the interchange")
logger.debug("Setting kill event")
kill_event.set()
e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0],
Expand All @@ -452,16 +451,15 @@ def process_task_outgoing_incoming(
self.results_outgoing.send(pkl_package)
logger.error("Sent failure reports, shutting down interchange")
else:
logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v']))
logger.info("Manager {!r} has compatible Python version {}".format(manager_id,
msg['python_v'].rsplit(".", 1)[0]))
logger.info(f"Manager {manager_id!r} has compatible Parsl version {msg['parsl_v']}")
logger.info(f"Manager {manager_id!r} has compatible Python version {msg['python_v'].rsplit('.', 1)[0]}")
elif msg['type'] == 'heartbeat':
self._ready_managers[manager_id]['last_heartbeat'] = time.time()
logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id))
logger.debug("Manager %r sent heartbeat via tasks connection", manager_id)
self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE])
elif msg['type'] == 'drain':
self._ready_managers[manager_id]['draining'] = True
logger.debug(f"Manager {manager_id!r} requested drain")
logger.debug("Manager %r requested drain", manager_id)
else:
logger.error(f"Unexpected message type received from manager: {msg['type']}")
logger.debug("leaving task_outgoing section")
Expand All @@ -484,9 +482,11 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r
def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
# Check if there are tasks that could be sent to managers

logger.debug("Managers count (interesting/total): {interesting}/{total}".format(
total=len(self._ready_managers),
interesting=len(interesting_managers)))
logger.debug(
"Managers count (interesting/total): {}/{}",
len(interesting_managers),
len(self._ready_managers)
)

if interesting_managers and not self.pending_task_queue.empty():
shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers)
Expand All @@ -497,7 +497,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
tasks_inflight = len(m['tasks'])
real_capacity = m['max_capacity'] - tasks_inflight

if (real_capacity and m['active'] and not m['draining']):
if real_capacity and m["active"] and not m["draining"]:
tasks = self.get_tasks(real_capacity)
if tasks:
self.task_outgoing.send_multipart([manager_id, b'', pickle.dumps(tasks)])
Expand All @@ -506,19 +506,19 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
tids = [t['task_id'] for t in tasks]
m['tasks'].extend(tids)
m['idle_since'] = None
logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id))
logger.debug("Sent tasks: %s to manager %r", tids, manager_id)
# recompute real_capacity after sending tasks
real_capacity = m['max_capacity'] - tasks_inflight
if real_capacity > 0:
logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity))
logger.debug("Manager %r has free capacity %s", manager_id, real_capacity)
# ... so keep it in the interesting_managers list
else:
logger.debug("Manager {!r} is now saturated".format(manager_id))
logger.debug("Manager %r is now saturated", manager_id)
interesting_managers.remove(manager_id)
else:
interesting_managers.remove(manager_id)
# logger.debug("Nothing to send to manager {}".format(manager_id))
logger.debug("leaving _ready_managers section, with {} managers still interesting".format(len(interesting_managers)))
logger.debug("leaving _ready_managers section, with %s managers still interesting", len(interesting_managers))
else:
logger.debug("either no interesting managers or no tasks, so skipping manager pass")

Expand All @@ -528,9 +528,9 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
logger.debug("entering results_incoming section")
manager_id, *all_messages = self.results_incoming.recv_multipart()
if manager_id not in self._ready_managers:
logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id))
logger.warning(f"Received a result from a un-registered manager: {manager_id!r}")
else:
logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}")
logger.debug("Got %s result items in batch from manager %r", len(all_messages), manager_id)

b_messages = []

Expand All @@ -548,10 +548,10 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_

monitoring_radio.send(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
logger.debug("Manager %r sent heartbeat via results connection", manager_id)
b_messages.append((p_message, r))
else:
logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type']))
logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"])

got_result = False
m = self._ready_managers[manager_id]
Expand All @@ -560,14 +560,16 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
if r['type'] == 'result':
got_result = True
try:
logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}")
logger.debug("Removing task %s from manager record %r", r["task_id"], manager_id)
m['tasks'].remove(r['task_id'])
except Exception:
# If we reach here, there's something very wrong.
logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format(
logger.exception(
"Ignoring exception removing task_id %s for manager %r with task list %s",
r['task_id'],
manager_id,
m['tasks']))
m["tasks"]
)

b_messages_to_send = []
for (b_message, _) in b_messages:
Expand All @@ -578,7 +580,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
self.results_outgoing.send_multipart(b_messages_to_send)
logger.debug("Sent messages on results_outgoing")

logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}")
logger.debug("Current tasks on manager %r: %s", manager_id, m["tasks"])
if len(m['tasks']) == 0 and m['idle_since'] is None:
m['idle_since'] = time.time()

Expand Down
Loading