Skip to content

Commit

Permalink
Re-enable str-bytes-safe mypy error, and fix uses (#2938)
Browse files Browse the repository at this point in the history
These changes are all in the interchange. In most cases, this error is
found when a manager id (a byte sequence) is used in a string substitution,
and to make happy, the format string is modified to indicate that we
really do want to represent that byte sequence as a repr.

The only other place is one rendering of a binary message into a log
string, and the fix is the same.

from benc-mypy branch
  • Loading branch information
benclifford authored Nov 1, 2023
1 parent 70b176a commit f95461e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 24 deletions.
5 changes: 0 additions & 5 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
[mypy]
plugins = sqlalchemy.ext.mypy.plugin

# globally disabled error codes:
# str-bytes-safe warns that a byte string is formatted into a string.
# which is commonly done with manager IDs in the parsl
# codebase.
disable_error_code = str-bytes-safe
enable_error_code = ignore-without-code
no_implicit_reexport = True
warn_redundant_casts = True
Expand Down
37 changes: 18 additions & 19 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def _command_server(self):
elif command_req.startswith("HOLD_WORKER"):
cmd, s_manager = command_req.split(';')
manager_id = s_manager.encode('utf-8')
logger.info("Received HOLD_WORKER for {}".format(manager_id))
logger.info("Received HOLD_WORKER for {!r}".format(manager_id))
if manager_id in self._ready_managers:
m = self._ready_managers[manager_id]
m['active'] = False
Expand Down Expand Up @@ -396,9 +396,9 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill
msg = json.loads(message[1].decode('utf-8'))
reg_flag = True
except Exception:
logger.warning("Got Exception reading registration message from manager: {}".format(
logger.warning("Got Exception reading registration message from manager: {!r}".format(
manager_id), exc_info=True)
logger.debug("Message: \n{}\n".format(message[1]))
logger.debug("Message: \n{!r}\n".format(message[1]))
else:
# We set up an entry only if registration works correctly
self._ready_managers[manager_id] = {'last_heartbeat': time.time(),
Expand All @@ -410,15 +410,15 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill
'tasks': []}
if reg_flag is True:
interesting_managers.add(manager_id)
logger.info("Adding manager: {} to ready queue".format(manager_id))
logger.info("Adding manager: {!r} to ready queue".format(manager_id))
m = self._ready_managers[manager_id]
m.update(msg)
logger.info("Registration info for manager {}: {}".format(manager_id, msg))
logger.info("Registration info for manager {!r}: {}".format(manager_id, msg))
self._send_monitoring_info(hub_channel, m)

if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or
msg['parsl_v'] != self.current_platform['parsl_v']):
logger.error("Manager {} has incompatible version info with the interchange".format(manager_id))
logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id))
logger.debug("Setting kill event")
kill_event.set()
e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0],
Expand All @@ -431,19 +431,18 @@ def process_task_outgoing_incoming(self, interesting_managers, hub_channel, kill
self.results_outgoing.send(pkl_package)
logger.error("Sent failure reports, shutting down interchange")
else:
logger.info("Manager {} has compatible Parsl version {}".format(manager_id, msg['parsl_v']))
logger.info("Manager {} has compatible Python version {}".format(manager_id,
msg['python_v'].rsplit(".", 1)[0]))
logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v']))
logger.info("Manager {!r} has compatible Python version {}".format(manager_id,
msg['python_v'].rsplit(".", 1)[0]))
else:
# Registration has failed.
logger.debug("Suppressing bad registration from manager: {}".format(
manager_id))
logger.debug("Suppressing bad registration from manager: {!r}".format(manager_id))

else:
tasks_requested = int.from_bytes(message[1], "little")
self._ready_managers[manager_id]['last_heartbeat'] = time.time()
if tasks_requested == HEARTBEAT_CODE:
logger.debug("Manager {} sent heartbeat via tasks connection".format(manager_id))
logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id))
self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE])
else:
logger.error("Unexpected non-heartbeat message received from manager {}")
Expand Down Expand Up @@ -497,9 +496,9 @@ def process_results_incoming(self, interesting_managers, hub_channel):
logger.debug("entering results_incoming section")
manager_id, *all_messages = self.results_incoming.recv_multipart()
if manager_id not in self._ready_managers:
logger.warning("Received a result from a un-registered manager: {}".format(manager_id))
logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id))
else:
logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id}")
logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}")

b_messages = []

Expand All @@ -511,7 +510,7 @@ def process_results_incoming(self, interesting_managers, hub_channel):
elif r['type'] == 'monitoring':
hub_channel.send_pyobj(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug(f"Manager {manager_id} sent heartbeat via results connection")
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
b_messages.append((p_message, r))
else:
logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type']))
Expand All @@ -523,11 +522,11 @@ def process_results_incoming(self, interesting_managers, hub_channel):
if r['type'] == 'result':
got_result = True
try:
logger.debug(f"Removing task {r['task_id']} from manager record {manager_id}")
logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}")
m['tasks'].remove(r['task_id'])
except Exception:
# If we reach here, there's something very wrong.
logger.exception("Ignoring exception removing task_id {} for manager {} with task list {}".format(
logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format(
r['task_id'],
manager_id,
m['tasks']))
Expand All @@ -541,7 +540,7 @@ def process_results_incoming(self, interesting_managers, hub_channel):
self.results_outgoing.send_multipart(b_messages_to_send)
logger.debug("Sent messages on results_outgoing")

logger.debug(f"Current tasks on manager {manager_id}: {m['tasks']}")
logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}")
if len(m['tasks']) == 0 and m['idle_since'] is None:
m['idle_since'] = time.time()

Expand All @@ -558,7 +557,7 @@ def expire_bad_managers(self, interesting_managers, hub_channel):
time.time() - m['last_heartbeat'] > self.heartbeat_threshold]
for (manager_id, m) in bad_managers:
logger.debug("Last: {} Current: {}".format(m['last_heartbeat'], time.time()))
logger.warning(f"Too many heartbeats missed for manager {manager_id} - removing manager")
logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager")
if m['active']:
m['active'] = False
self._send_monitoring_info(hub_channel, m)
Expand Down

0 comments on commit f95461e

Please sign in to comment.