Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PYTHON-4579 Stop gossiping $clusterTime on SDAM connections #1925

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions pymongo/asynchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import logging
import time
import weakref
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
from typing import TYPE_CHECKING, Any, Optional

from pymongo import common
from pymongo._csot import MovingMinimum
from pymongo.asynchronous import periodic_executor
from pymongo.asynchronous.periodic_executor import _shutdown_executors
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
from pymongo.errors import NetworkTimeout, _OperationCancelled
from pymongo.hello import Hello
from pymongo.lock import _create_lock
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
Expand Down Expand Up @@ -245,13 +245,7 @@ async def _check_server(self) -> ServerDescription:
"""
start = time.monotonic()
try:
try:
return await self._check_once()
except (OperationFailure, NotPrimaryError) as exc:
# Update max cluster time even when hello fails.
details = cast(Mapping[str, Any], exc.details)
self._topology.receive_cluster_time(details.get("$clusterTime"))
raise
return await self._check_once()
except ReferenceError:
raise
except Exception as error:
Expand Down Expand Up @@ -345,7 +339,6 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]

Can raise ConnectionFailure or OperationFailure.
"""
cluster_time = self._topology.max_cluster_time()
start = time.monotonic()
if conn.more_to_come:
# Read the next streaming hello (MongoDB 4.4+).
Expand All @@ -355,13 +348,13 @@ async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]
):
# Initiate streaming hello (MongoDB 4.4+).
response = await conn._hello(
cluster_time,
None,
self._server_description.topology_version,
self._settings.heartbeat_frequency,
)
else:
# New connection handshake or polling hello (MongoDB <4.4).
response = await conn._hello(cluster_time, None, None)
response = await conn._hello(None, None, None)
duration = _monotonic_duration(start)
return response, duration

Expand Down
4 changes: 4 additions & 0 deletions pymongo/asynchronous/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ async def command(
)

response_doc = unpacked_docs[0]
if not conn.ready:
cluster_time = response_doc.get("$clusterTime")
if cluster_time:
conn._cluster_time = cluster_time
if client:
await client._process_response(response_doc, session)
if check:
Expand Down
5 changes: 5 additions & 0 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ def __init__(
self.connect_rtt = 0.0
self._client_id = pool._client_id
self.creation_time = time.monotonic()
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None

def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
Expand Down Expand Up @@ -1304,6 +1306,9 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
conn.close_conn(ConnectionClosedReason.ERROR)
raise

if handler:
await handler.client._topology.receive_cluster_time(conn._cluster_time)

return conn

@contextlib.asynccontextmanager
Expand Down
3 changes: 2 additions & 1 deletion pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ async def _process_change(

self._description = new_td
await self._update_servers()
self._receive_cluster_time_no_lock(server_description.cluster_time)
# TODO: Verify that app errors update the $clusterTime.
# self._receive_cluster_time_no_lock(server_description.cluster_time)

if self._publish_tp and not suppress_event:
assert self._events is not None
Expand Down
17 changes: 5 additions & 12 deletions pymongo/synchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import logging
import time
import weakref
from typing import TYPE_CHECKING, Any, Mapping, Optional, cast
from typing import TYPE_CHECKING, Any, Optional

from pymongo import common
from pymongo._csot import MovingMinimum
from pymongo.errors import NetworkTimeout, NotPrimaryError, OperationFailure, _OperationCancelled
from pymongo.errors import NetworkTimeout, _OperationCancelled
from pymongo.hello import Hello
from pymongo.lock import _create_lock
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
Expand Down Expand Up @@ -245,13 +245,7 @@ def _check_server(self) -> ServerDescription:
"""
start = time.monotonic()
try:
try:
return self._check_once()
except (OperationFailure, NotPrimaryError) as exc:
# Update max cluster time even when hello fails.
details = cast(Mapping[str, Any], exc.details)
self._topology.receive_cluster_time(details.get("$clusterTime"))
raise
return self._check_once()
except ReferenceError:
raise
except Exception as error:
Expand Down Expand Up @@ -345,7 +339,6 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]:

Can raise ConnectionFailure or OperationFailure.
"""
cluster_time = self._topology.max_cluster_time()
start = time.monotonic()
if conn.more_to_come:
# Read the next streaming hello (MongoDB 4.4+).
Expand All @@ -355,13 +348,13 @@ def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]:
):
# Initiate streaming hello (MongoDB 4.4+).
response = conn._hello(
cluster_time,
None,
self._server_description.topology_version,
self._settings.heartbeat_frequency,
)
else:
# New connection handshake or polling hello (MongoDB <4.4).
response = conn._hello(cluster_time, None, None)
response = conn._hello(None, None, None)
duration = _monotonic_duration(start)
return response, duration

Expand Down
4 changes: 4 additions & 0 deletions pymongo/synchronous/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ def command(
)

response_doc = unpacked_docs[0]
if not conn.ready:
cluster_time = response_doc.get("$clusterTime")
if cluster_time:
conn._cluster_time = cluster_time
if client:
client._process_response(response_doc, session)
if check:
Expand Down
5 changes: 5 additions & 0 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ def __init__(
self.connect_rtt = 0.0
self._client_id = pool._client_id
self.creation_time = time.monotonic()
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None

def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
Expand Down Expand Up @@ -1298,6 +1300,9 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect
conn.close_conn(ConnectionClosedReason.ERROR)
raise

if handler:
handler.client._topology.receive_cluster_time(conn._cluster_time)

return conn

@contextlib.contextmanager
Expand Down
3 changes: 2 additions & 1 deletion pymongo/synchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ def _process_change(

self._description = new_td
self._update_servers()
self._receive_cluster_time_no_lock(server_description.cluster_time)
# TODO: Verify that app errors update the $clusterTime.
# self._receive_cluster_time_no_lock(server_description.cluster_time)

if self._publish_tp and not suppress_event:
assert self._events is not None
Expand Down
20 changes: 9 additions & 11 deletions test/test_discovery_and_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class TestClusterTimeComparison(unittest.TestCase):
def test_cluster_time_comparison(self):
t = create_mock_topology("mongodb://host")

def send_cluster_time(time, inc, should_update):
def send_cluster_time(time, inc):
old = t.max_cluster_time()
new = {"clusterTime": Timestamp(time, inc)}
got_hello(
Expand All @@ -259,16 +259,14 @@ def send_cluster_time(time, inc, should_update):
)

actual = t.max_cluster_time()
if should_update:
self.assertEqual(actual, new)
else:
self.assertEqual(actual, old)

send_cluster_time(0, 1, True)
send_cluster_time(2, 2, True)
send_cluster_time(2, 1, False)
send_cluster_time(1, 3, False)
send_cluster_time(2, 3, True)
# We never update $clusterTime from monitoring connections.
self.assertEqual(actual, old)

send_cluster_time(0, 1)
send_cluster_time(2, 2)
send_cluster_time(2, 1)
send_cluster_time(1, 3)
send_cluster_time(2, 3)


class TestIgnoreStaleErrors(IntegrationTest):
Expand Down
Loading