Skip to content

Commit

Permalink
Non-functional change: minor log call-site updates (#3597)
Browse files Browse the repository at this point in the history
Massage log statements to use argument style consistent with recent practice
  • Loading branch information
khk-globus authored Aug 21, 2024
1 parent 0fc966f commit b284dc1
Showing 1 changed file with 31 additions and 29 deletions.
60 changes: 31 additions & 29 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def start(self) -> None:

self.zmq_context.destroy()
delta = time.time() - start
logger.info("Processed {} tasks in {} seconds".format(self.count, delta))
logger.info(f"Processed {self.count} tasks in {delta} seconds")
logger.warning("Exiting")

def process_task_outgoing_incoming(
Expand All @@ -396,17 +396,16 @@ def process_task_outgoing_incoming(
try:
msg = json.loads(message[1].decode('utf-8'))
except Exception:
logger.warning("Got Exception reading message from manager: {!r}".format(
manager_id), exc_info=True)
logger.debug("Message: \n{!r}\n".format(message[1]))
logger.warning(f"Got Exception reading message from manager: {manager_id!r}", exc_info=True)
logger.debug("Message:\n %r\n", message[1])
return

# perform a bit of validation on the structure of the deserialized
# object, at least enough to behave like a deserialization error
# in obviously malformed cases
if not isinstance(msg, dict) or 'type' not in msg:
logger.error(f"JSON message was not correctly formatted from manager: {manager_id!r}")
logger.debug("Message: \n{!r}\n".format(message[1]))
logger.debug("Message:\n %r\n", message[1])
return

if msg['type'] == 'registration':
Expand All @@ -425,7 +424,7 @@ def process_task_outgoing_incoming(
self.connected_block_history.append(msg['block_id'])

interesting_managers.add(manager_id)
logger.info("Adding manager: {!r} to ready queue".format(manager_id))
logger.info(f"Adding manager: {manager_id!r} to ready queue")
m = self._ready_managers[manager_id]

# m is a ManagerRecord, but msg is a dict[Any,Any] and so can
Expand All @@ -434,12 +433,12 @@ def process_task_outgoing_incoming(
# later.
m.update(msg) # type: ignore[typeddict-item]

logger.info("Registration info for manager {!r}: {}".format(manager_id, msg))
logger.info(f"Registration info for manager {manager_id!r}: {msg}")
self._send_monitoring_info(monitoring_radio, m)

if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or
msg['parsl_v'] != self.current_platform['parsl_v']):
logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id))
logger.error(f"Manager {manager_id!r} has incompatible version info with the interchange")
logger.debug("Setting kill event")
kill_event.set()
e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0],
Expand All @@ -452,16 +451,15 @@ def process_task_outgoing_incoming(
self.results_outgoing.send(pkl_package)
logger.error("Sent failure reports, shutting down interchange")
else:
logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v']))
logger.info("Manager {!r} has compatible Python version {}".format(manager_id,
msg['python_v'].rsplit(".", 1)[0]))
logger.info(f"Manager {manager_id!r} has compatible Parsl version {msg['parsl_v']}")
logger.info(f"Manager {manager_id!r} has compatible Python version {msg['python_v'].rsplit('.', 1)[0]}")
elif msg['type'] == 'heartbeat':
self._ready_managers[manager_id]['last_heartbeat'] = time.time()
logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id))
logger.debug("Manager %r sent heartbeat via tasks connection", manager_id)
self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE])
elif msg['type'] == 'drain':
self._ready_managers[manager_id]['draining'] = True
logger.debug(f"Manager {manager_id!r} requested drain")
logger.debug("Manager %r requested drain", manager_id)
else:
logger.error(f"Unexpected message type received from manager: {msg['type']}")
logger.debug("leaving task_outgoing section")
Expand All @@ -484,9 +482,11 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r
def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
# Check if there are tasks that could be sent to managers

logger.debug("Managers count (interesting/total): {interesting}/{total}".format(
total=len(self._ready_managers),
interesting=len(interesting_managers)))
logger.debug(
"Managers count (interesting/total): {}/{}",
len(interesting_managers),
len(self._ready_managers)
)

if interesting_managers and not self.pending_task_queue.empty():
shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers)
Expand All @@ -497,7 +497,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
tasks_inflight = len(m['tasks'])
real_capacity = m['max_capacity'] - tasks_inflight

if (real_capacity and m['active'] and not m['draining']):
if real_capacity and m["active"] and not m["draining"]:
tasks = self.get_tasks(real_capacity)
if tasks:
self.task_outgoing.send_multipart([manager_id, b'', pickle.dumps(tasks)])
Expand All @@ -506,19 +506,19 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
tids = [t['task_id'] for t in tasks]
m['tasks'].extend(tids)
m['idle_since'] = None
logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id))
logger.debug("Sent tasks: %s to manager %r", tids, manager_id)
# recompute real_capacity after sending tasks
real_capacity = m['max_capacity'] - tasks_inflight
if real_capacity > 0:
logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity))
logger.debug("Manager %r has free capacity %s", manager_id, real_capacity)
# ... so keep it in the interesting_managers list
else:
logger.debug("Manager {!r} is now saturated".format(manager_id))
logger.debug("Manager %r is now saturated", manager_id)
interesting_managers.remove(manager_id)
else:
interesting_managers.remove(manager_id)
# logger.debug("Nothing to send to manager {}".format(manager_id))
logger.debug("leaving _ready_managers section, with {} managers still interesting".format(len(interesting_managers)))
logger.debug("leaving _ready_managers section, with %s managers still interesting", len(interesting_managers))
else:
logger.debug("either no interesting managers or no tasks, so skipping manager pass")

Expand All @@ -528,9 +528,9 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
logger.debug("entering results_incoming section")
manager_id, *all_messages = self.results_incoming.recv_multipart()
if manager_id not in self._ready_managers:
logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id))
logger.warning(f"Received a result from a un-registered manager: {manager_id!r}")
else:
logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}")
logger.debug("Got %s result items in batch from manager %r", len(all_messages), manager_id)

b_messages = []

Expand All @@ -548,10 +548,10 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_

monitoring_radio.send(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
logger.debug("Manager %r sent heartbeat via results connection", manager_id)
b_messages.append((p_message, r))
else:
logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type']))
logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"])

got_result = False
m = self._ready_managers[manager_id]
Expand All @@ -560,14 +560,16 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
if r['type'] == 'result':
got_result = True
try:
logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}")
logger.debug("Removing task %s from manager record %r", r["task_id"], manager_id)
m['tasks'].remove(r['task_id'])
except Exception:
# If we reach here, there's something very wrong.
logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format(
logger.exception(
"Ignoring exception removing task_id %s for manager %r with task list %s",
r['task_id'],
manager_id,
m['tasks']))
m["tasks"]
)

b_messages_to_send = []
for (b_message, _) in b_messages:
Expand All @@ -578,7 +580,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
self.results_outgoing.send_multipart(b_messages_to_send)
logger.debug("Sent messages on results_outgoing")

logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}")
logger.debug("Current tasks on manager %r: %s", manager_id, m["tasks"])
if len(m['tasks']) == 0 and m['idle_since'] is None:
m['idle_since'] = time.time()

Expand Down

0 comments on commit b284dc1

Please sign in to comment.