From 14c8432bc37dd1b97d00ac9e0fb4e51494231aab Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Mon, 14 Oct 2024 10:45:57 -0700 Subject: [PATCH 1/2] PYTHON-4579 Stop gossiping $clusterTime on SDAM connections --- pymongo/asynchronous/monitor.py | 17 +++++------------ pymongo/asynchronous/topology.py | 3 ++- pymongo/synchronous/monitor.py | 17 +++++------------ pymongo/synchronous/topology.py | 3 ++- test/test_discovery_and_monitoring.py | 20 +++++++++----------- 5 files changed, 23 insertions(+), 37 deletions(-) diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index f9e912b084..b31d4b708b 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -20,13 +20,13 @@ import logging import time import weakref -from typing import TYPE_CHECKING, Any, Mapping, Optional, cast +from typing import TYPE_CHECKING, Any, Optional from pymongo import common from pymongo._csot import MovingMinimum from pymongo.asynchronous import periodic_executor from pymongo.asynchronous.periodic_executor import _shutdown_executors -from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled +from pymongo.errors import NetworkTimeout, _OperationCancelled from pymongo.hello import Hello from pymongo.lock import _create_lock from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage @@ -245,13 +245,7 @@ async def _check_server(self) -> ServerDescription: """ start = time.monotonic() try: - try: - return await self._check_once() - except (OperationFailure, NotPrimaryError) as exc: - # Update max cluster time even when hello fails. - details = cast(Mapping[str, Any], exc.details) - self._topology.receive_cluster_time(details.get("$clusterTime")) - raise + return await self._check_once() except ReferenceError: raise except Exception as error: @@ -345,7 +339,6 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float] Can raise ConnectionFailure or OperationFailure. """ - cluster_time = self._topology.max_cluster_time() start = time.monotonic() if conn.more_to_come: # Read the next streaming hello (MongoDB 4.4+). @@ -355,13 +348,13 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float] ): # Initiate streaming hello (MongoDB 4.4+). response = await conn._hello( - cluster_time, + None, self._server_description.topology_version, self._settings.heartbeat_frequency, ) else: # New connection handshake or polling hello (MongoDB <4.4). - response = await conn._hello(cluster_time, None, None) + response = await conn._hello(None, None, None) duration = _monotonic_duration(start) return response, duration diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 82af4257ba..d086c678ab 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -489,7 +489,8 @@ async def _process_change( self._description = new_td await self._update_servers() - self._receive_cluster_time_no_lock(server_description.cluster_time) + # TODO: Verify that app errors update the $clusterTime. + # self._receive_cluster_time_no_lock(server_description.cluster_time) if self._publish_tp and not suppress_event: assert self._events is not None diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index 3f9bb2ea75..99accf66e2 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -20,11 +20,11 @@ import logging import time import weakref -from typing import TYPE_CHECKING, Any, Mapping, Optional, cast +from typing import TYPE_CHECKING, Any, Optional from pymongo import common from pymongo._csot import MovingMinimum -from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled +from pymongo.errors import NetworkTimeout, _OperationCancelled from pymongo.hello import Hello from pymongo.lock import _create_lock from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage @@ -245,13 +245,7 @@ def _check_server(self) -> ServerDescription: """ start = time.monotonic() try: - try: - return self._check_once() - except (OperationFailure, NotPrimaryError) as exc: - # Update max cluster time even when hello fails. - details = cast(Mapping[str, Any], exc.details) - self._topology.receive_cluster_time(details.get("$clusterTime")) - raise + return self._check_once() except ReferenceError: raise except Exception as error: @@ -345,7 +339,6 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]: Can raise ConnectionFailure or OperationFailure. """ - cluster_time = self._topology.max_cluster_time() start = time.monotonic() if conn.more_to_come: # Read the next streaming hello (MongoDB 4.4+). @@ -355,13 +348,13 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]: ): # Initiate streaming hello (MongoDB 4.4+). response = conn._hello( - cluster_time, + None, self._server_description.topology_version, self._settings.heartbeat_frequency, ) else: # New connection handshake or polling hello (MongoDB <4.4). - response = conn._hello(cluster_time, None, None) + response = conn._hello(None, None, None) duration = _monotonic_duration(start) return response, duration diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index a350c1702e..b32800b9e8 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -489,7 +489,8 @@ def _process_change( self._description = new_td self._update_servers() - self._receive_cluster_time_no_lock(server_description.cluster_time) + # TODO: Verify that app errors update the $clusterTime. + # self._receive_cluster_time_no_lock(server_description.cluster_time) if self._publish_tp and not suppress_event: assert self._events is not None diff --git a/test/test_discovery_and_monitoring.py b/test/test_discovery_and_monitoring.py index ce7a52f1a0..70dcfc5b48 100644 --- a/test/test_discovery_and_monitoring.py +++ b/test/test_discovery_and_monitoring.py @@ -244,7 +244,7 @@ class TestClusterTimeComparison(unittest.TestCase): def test_cluster_time_comparison(self): t = create_mock_topology("mongodb://host") - def send_cluster_time(time, inc, should_update): + def send_cluster_time(time, inc): old = t.max_cluster_time() new = {"clusterTime": Timestamp(time, inc)} got_hello( @@ -259,16 +259,14 @@ def send_cluster_time(time, inc, should_update): ) actual = t.max_cluster_time() - if should_update: - self.assertEqual(actual, new) - else: - self.assertEqual(actual, old) - - send_cluster_time(0, 1, True) - send_cluster_time(2, 2, True) - send_cluster_time(2, 1, False) - send_cluster_time(1, 3, False) - send_cluster_time(2, 3, True) + # We never update $clusterTime from monitoring connections. + self.assertEqual(actual, old) + + send_cluster_time(0, 1) + send_cluster_time(2, 2) + send_cluster_time(2, 1) + send_cluster_time(1, 3) + send_cluster_time(2, 3) class TestIgnoreStaleErrors(IntegrationTest): From 22ae993113d6e70a0b0516ea22579ef067ba29c2 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Mon, 14 Oct 2024 14:36:52 -0700 Subject: [PATCH 2/2] PYTHON-4579 Gossip $clusterTime from connection handshake --- pymongo/asynchronous/network.py | 4 ++++ pymongo/asynchronous/pool.py | 5 +++++ pymongo/synchronous/network.py | 4 ++++ pymongo/synchronous/pool.py | 5 +++++ 4 files changed, 18 insertions(+) diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index d17aead120..c7a5580eca 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -207,6 +207,10 @@ async def command( ) response_doc = unpacked_docs[0] + if not conn.ready: + cluster_time = response_doc.get("$clusterTime") + if cluster_time: + conn._cluster_time = cluster_time if client: await client._process_response(response_doc, session) if check: diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index a9f02d650a..51702231f2 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -312,6 +312,8 @@ def __init__( self.connect_rtt = 0.0 self._client_id = pool._client_id self.creation_time = time.monotonic() + # For gossiping $clusterTime from the connection handshake to the client. + self._cluster_time = None def set_conn_timeout(self, timeout: Optional[float]) -> None: """Cache last timeout to avoid duplicate calls to conn.settimeout.""" @@ -1304,6 +1306,9 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A conn.close_conn(ConnectionClosedReason.ERROR) raise + if handler: + await handler.client._topology.receive_cluster_time(conn._cluster_time) + return conn @contextlib.asynccontextmanager diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 7206dca735..543b069bfc 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -207,6 +207,10 @@ def command( ) response_doc = unpacked_docs[0] + if not conn.ready: + cluster_time = response_doc.get("$clusterTime") + if cluster_time: + conn._cluster_time = cluster_time if client: client._process_response(response_doc, session) if check: diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index eb007a3471..95fa879633 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -312,6 +312,8 @@ def __init__( self.connect_rtt = 0.0 self._client_id = pool._client_id self.creation_time = time.monotonic() + # For gossiping $clusterTime from the connection handshake to the client. + self._cluster_time = None def set_conn_timeout(self, timeout: Optional[float]) -> None: """Cache last timeout to avoid duplicate calls to conn.settimeout.""" @@ -1298,6 +1300,9 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect conn.close_conn(ConnectionClosedReason.ERROR) raise + if handler: + handler.client._topology.receive_cluster_time(conn._cluster_time) + return conn @contextlib.contextmanager