From 203f70e51225ced38dafb44c2a90948b2f1fc5a1 Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Thu, 2 Nov 2023 13:29:16 +0100 Subject: [PATCH] router: shutdown via endpoints, not channels Add a .do_close() virtual method at the Endpoint level. This is already implemented by Channel, but let's also add an implementation of it to Peer which shuts down the Peer in the usual way. When the bridge gets EOF we can now call this method on all endpoints instead of creating synthetic close messages for each channel. We also stop tracking open channels for knowing when the shutdown is complete, and track endpoints instead (which is a strictly tighter constraint, as it includes endpoints with no open channels). There's the small matter of the increasingly redundant shutdown() method on routing rules, but we can clean that up soon. --- src/cockpit/peer.py | 3 +++ src/cockpit/router.py | 25 +++++++++++++++++-------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/cockpit/peer.py b/src/cockpit/peer.py index a20838c9ee86..2e3df6784775 100644 --- a/src/cockpit/peer.py +++ b/src/cockpit/peer.py @@ -229,6 +229,9 @@ def do_kill(self, host: Optional[str], group: Optional[str]) -> None: assert self.init_future is None self.write_control(command='kill', host=host, group=group) + def do_close(self) -> None: + self.close() + class ConfiguredPeer(Peer): config: BridgeConfig diff --git a/src/cockpit/router.py b/src/cockpit/router.py index 794867dd6703..3ac0a633ac04 100644 --- a/src/cockpit/router.py +++ b/src/cockpit/router.py @@ -76,6 +76,9 @@ def thaw_endpoint(self): self.__endpoint_frozen_queue = None # interface for receiving messages + def do_close(self): + raise NotImplementedError + def do_channel_control(self, channel: str, command: str, message: JsonObject) -> None: raise NotImplementedError @@ -160,10 +163,6 @@ def drop_channel(self, channel: str) -> None: except KeyError: logger.error('trying to drop non-existent channel %s from %s', channel, self.open_channels) - # were we waiting to exit? - if not self.open_channels and self._eof and self.transport: - self.transport.close() - def shutdown_endpoint(self, endpoint: Endpoint, **kwargs) -> None: channels = self.endpoints.pop(endpoint) logger.debug('shutdown_endpoint(%s, %s) will close %s', endpoint, kwargs, channels) @@ -171,6 +170,13 @@ def shutdown_endpoint(self, endpoint: Endpoint, **kwargs) -> None: self.write_control(command='close', channel=channel, **kwargs) self.drop_channel(channel) + # were we waiting to exit? + if self._eof: + logger.debug(' %d endpoints remaining', len(self.endpoints)) + if not self.endpoints and self.transport: + logger.debug(' close transport') + self.transport.close() + def do_kill(self, host: Optional[str], group: Optional[str]) -> None: endpoints = set(self.endpoints) logger.debug('do_kill(%s, %s). Considering %d endpoints.', host, group, len(endpoints)) @@ -213,12 +219,15 @@ def channel_data_received(self, channel: str, data: bytes) -> None: endpoint.do_channel_data(channel, data) def eof_received(self) -> bool: - self._eof = True + logger.debug('eof_received(%r)', self) - for channel, endpoint in list(self.open_channels.items()): - endpoint.do_channel_control(channel, 'close', {'command': 'close', 'channel': channel}) + endpoints = set(self.endpoints) + for endpoint in endpoints: + endpoint.do_close() - return bool(self.open_channels) + self._eof = True + logger.debug(' endpoints remaining: %r', self.endpoints) + return bool(self.endpoints) def do_closed(self, exc: Optional[Exception]) -> None: for rule in self.routing_rules: