From af282238276f7cdeade6a61580067fb2970623de Mon Sep 17 00:00:00 2001 From: Jonathan Keljo Date: Sun, 18 Feb 2024 15:47:32 -0800 Subject: [PATCH] Fix shutdown (#28) Python 3.12 changed Server.wait_closed to actually wait for the server to close, which revealed that our shutdown logic was incorrect. We need to first allow the queue to flush and close the connections, then wait. --- greeneye/monitor.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/greeneye/monitor.py b/greeneye/monitor.py index 47daf38..18e04b3 100644 --- a/greeneye/monitor.py +++ b/greeneye/monitor.py @@ -693,31 +693,32 @@ def _create_protocol(self) -> GemProtocol: async def close(self) -> None: if self._server is not None: - LOG.info( - "Closing server on {}".format(self._server.sockets[0].getsockname()) - ) + sockname = self._server.sockets[0].getsockname() + LOG.info("Closing the server on {}...".format(sockname)) # Disallow new connections self._server.close() + # Wait for packets to be processed + await self._queue.join() + + if self._consumer_task is not None: + # Cancel consumer task + self._consumer_task.cancel() + try: + await self._consumer_task + except asyncio.CancelledError: + pass + self._consumer_task = None + + while len(self._protocols) > 0: + (_, protocol) = self._protocols.popitem() + protocol.close() + # Wait for shutdown await self._server.wait_closed() self._server = None - # Wait for packets to be processed - await self._queue.join() - - if self._consumer_task is not None: - # Cancel consumer task - self._consumer_task.cancel() - try: - await self._consumer_task - except asyncio.CancelledError: - pass - self._consumer_task = None - - while len(self._protocols) > 0: - (_, protocol) = self._protocols.popitem() - protocol.close() + LOG.info("Closed the server on {}.".format(sockname)) MonitorListener = Union[Callable[[Monitor], Awaitable[None]], Callable[[Monitor], None]]