Skip to content

Commit

Permalink
feat: add timeouts for draining/closing
Browse files Browse the repository at this point in the history
This mitigates potential deadlocks caused by the client/server
applying backpressure to each other at the same time.
  • Loading branch information
thegamecracks committed Apr 2, 2024
1 parent a152f5f commit 69ac37b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
21 changes: 17 additions & 4 deletions src/dumdum/client/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ def __init__(
nick: str,
*,
event_callback: Callable[[ClientEvent], Any],
drain_timeout: float = 30,
close_timeout: float = 5,
) -> None:
self.nick = nick
self.event_callback = event_callback
self.drain_timeout = drain_timeout
self.close_timeout = close_timeout

self._protocol = Client(nick)
self._reader = None
Expand Down Expand Up @@ -92,8 +96,7 @@ async def close(self) -> None:

# Any exceptions here will be repeated in _read_loop()
self._writer.close()
with contextlib.suppress(Exception):
await self._writer.wait_closed()
await self._wait_closed()

async def send_message(self, channel_name: str, content: str) -> None:
data = self._protocol.send_message(channel_name, content)
Expand Down Expand Up @@ -134,7 +137,7 @@ async def _read_loop(
events, outgoing = self._protocol.receive_bytes(data)
writer.write(outgoing)
await self._handle_events(events)
await writer.drain() # exert backpressure
await self._drain() # exert backpressure

async def _handshake(self) -> bool | None:
assert self._writer is not None
Expand Down Expand Up @@ -185,4 +188,14 @@ def _dispatch_event(self, event: ClientEvent) -> None:
async def _send_and_drain(self, data: bytes) -> None:
assert self._writer is not None
self._writer.write(data)
await self._writer.drain()
await self._drain()

async def _drain(self) -> None:
assert self._writer is not None
await asyncio.wait_for(self._writer.drain(), timeout=self.drain_timeout)

async def _wait_closed(self) -> None:
assert self._writer is not None
timeout = self.close_timeout
with contextlib.suppress(Exception):
await asyncio.wait_for(self._writer.wait_closed(), timeout=timeout)
6 changes: 5 additions & 1 deletion src/dumdum/server/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ async def communicate(self) -> None:
events, outgoing = self.server.receive_bytes(data)
self.writer.write(outgoing)
await self._handle_events(events)
await self.writer.drain() # exert backpressure
await self._drain() # exert backpressure

async def _handle_events(self, events: list[ServerEvent]) -> None:
await self.manager._handle_events(self, events)

async def _drain(self) -> None:
timeout = self.manager.drain_timeout
await asyncio.wait_for(self.writer.drain(), timeout=timeout)
19 changes: 16 additions & 3 deletions src/dumdum/server/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,19 @@


class Manager:
def __init__(self, state: ServerState, ssl: ssl.SSLContext | None) -> None:
def __init__(
self,
state: ServerState,
ssl: ssl.SSLContext | None,
*,
drain_timeout: float = 30,
close_timeout: float = 5,
) -> None:
self.state = state
self.connections: list[Connection] = []
self.ssl = ssl
self.drain_timeout = drain_timeout
self.close_timeout = close_timeout

async def accept_connection(
self,
Expand All @@ -52,14 +61,18 @@ async def accept_connection(
log.info("Connection %s has disconnected", addr)

writer.close()
with contextlib.suppress(Exception):
await writer.wait_closed()
await self._wait_closed(writer)

self._close_connection(connection)

def _create_server(self) -> Server:
return Server()

async def _wait_closed(self, writer: asyncio.StreamWriter) -> None:
timeout = self.close_timeout
with contextlib.suppress(Exception):
await asyncio.wait_for(writer.wait_closed(), timeout=timeout)

async def _handle_events(self, conn: Connection, events: list[ServerEvent]) -> None:
for event in events:
await self._handle_event(conn, event)
Expand Down

0 comments on commit 69ac37b

Please sign in to comment.