diff --git a/samples/noxfile.py b/samples/noxfile.py index b103fd77..7d9fad95 100644 --- a/samples/noxfile.py +++ b/samples/noxfile.py @@ -57,6 +57,11 @@ def transaction(session): _sample(session) +@nox.session() +def stale_read(session): + _sample(session) + + @nox.session() def _all_samples(session): _sample(session) diff --git a/samples/stale_read_sample.py b/samples/stale_read_sample.py new file mode 100644 index 00000000..989a0c13 --- /dev/null +++ b/samples/stale_read_sample.py @@ -0,0 +1,96 @@ +# Copyright 2024 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid +from sqlalchemy import create_engine, Engine, select, text +from sqlalchemy.orm import Session +from sample_helper import run_sample +from model import Singer + + +# Shows how to execute stale reads on Spanner using SQLAlchemy. +def stale_read_sample(): + engine = create_engine( + "spanner:///projects/sample-project/" + "instances/sample-instance/" + "databases/sample-database", + echo=True, + ) + # First get the current database timestamp. We can use this timestamp to + # query the database at a point in time where we know it was empty. + with Session(engine.execution_options(isolation_level="AUTOCOMMIT")) as session: + timestamp = session.execute(select(text("current_timestamp"))).one()[0] + print(timestamp) + + # Insert a few test rows. + insert_test_data(engine) + + # Create a session that uses a read-only transaction with a strong timestamp + # bound. This means that it will read all data that has been committed at the + # time this transaction starts. + # Read-only transactions do not take locks, and are therefore preferred + # above read/write transactions for workloads that only read data on Spanner. + with Session(engine.execution_options(read_only=True)) as session: + print("Found singers with strong timestamp bound:") + singers = session.query(Singer).order_by(Singer.last_name).all() + for singer in singers: + print("Singer: ", singer.full_name) + + # Create a session that uses a read-only transaction that selects data in + # the past. We'll use the timestamp that we retrieved before inserting the + # test data for this transaction. + with Session( + engine.execution_options( + read_only=True, staleness={"read_timestamp": timestamp} + ) + ) as session: + print("Searching for singers using a read timestamp in the past:") + singers = session.query(Singer).order_by(Singer.last_name).all() + if singers: + for singer in singers: + print("Singer: ", singer.full_name) + else: + print("No singers found.") + + # Spanner also supports min_read_timestamp and max_staleness as staleness + # options. These can only be used in auto-commit mode. + # Spanner will choose a read timestamp that satisfies the given restriction + # and that can be served as efficiently as possible. + with Session( + engine.execution_options( + isolation_level="AUTOCOMMIT", staleness={"max_staleness": {"seconds": 15}} + ) + ) as session: + print("Searching for singers using a max staleness of 15 seconds:") + singers = session.query(Singer).order_by(Singer.last_name).all() + if singers: + for singer in singers: + print("Singer: ", singer.full_name) + else: + print("No singers found.") + + +def insert_test_data(engine: Engine): + with Session(engine) as session: + session.add_all( + [ + Singer(id=str(uuid.uuid4()), first_name="John", last_name="Doe"), + Singer(id=str(uuid.uuid4()), first_name="Jane", last_name="Doe"), + ] + ) + session.commit() + + +if __name__ == "__main__": + run_sample(stale_read_sample) diff --git a/test/mockserver_tests/stale_read_model.py b/test/mockserver_tests/stale_read_model.py new file mode 100644 index 00000000..025a56d2 --- /dev/null +++ b/test/mockserver_tests/stale_read_model.py @@ -0,0 +1,28 @@ +# Copyright 2024 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sqlalchemy import String, BigInteger +from sqlalchemy.orm import DeclarativeBase +from sqlalchemy.orm import Mapped +from sqlalchemy.orm import mapped_column + + +class Base(DeclarativeBase): + pass + + +class Singer(Base): + __tablename__ = "singers" + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + name: Mapped[str] = mapped_column(String) diff --git a/test/mockserver_tests/test_stale_reads.py b/test/mockserver_tests/test_stale_reads.py new file mode 100644 index 00000000..67f3261d --- /dev/null +++ b/test/mockserver_tests/test_stale_reads.py @@ -0,0 +1,176 @@ +# Copyright 2024 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +from sqlalchemy import create_engine, select +from sqlalchemy.orm import Session +from sqlalchemy.testing import eq_, is_instance_of +from google.cloud.spanner_v1 import ( + FixedSizePool, + BatchCreateSessionsRequest, + ExecuteSqlRequest, + GetSessionRequest, + BeginTransactionRequest, + TransactionOptions, +) +from test.mockserver_tests.mock_server_test_base import MockServerTestBase +from test.mockserver_tests.mock_server_test_base import add_result +import google.cloud.spanner_v1.types.type as spanner_type +import google.cloud.spanner_v1.types.result_set as result_set + + +class TestStaleReads(MockServerTestBase): + def test_stale_read_multi_use(self): + from test.mockserver_tests.stale_read_model import Singer + + add_singer_query_result("SELECT singers.id, singers.name \n" + "FROM singers") + engine = create_engine( + "spanner:///projects/p/instances/i/databases/d", + echo=True, + connect_args={"client": self.client, "pool": FixedSizePool(size=10)}, + ) + + timestamp = datetime.datetime.fromtimestamp(1733328910) + for i in range(2): + with Session( + engine.execution_options( + read_only=True, + staleness={"read_timestamp": timestamp}, + ) + ) as session: + # Execute two queries in a read-only transaction. + session.scalars(select(Singer)).all() + session.scalars(select(Singer)).all() + + # Verify the requests that we got. + requests = self.spanner_service.requests + eq_(9, len(requests)) + is_instance_of(requests[0], BatchCreateSessionsRequest) + # We should get rid of this extra round-trip for GetSession.... + is_instance_of(requests[1], GetSessionRequest) + is_instance_of(requests[2], BeginTransactionRequest) + is_instance_of(requests[3], ExecuteSqlRequest) + is_instance_of(requests[4], ExecuteSqlRequest) + is_instance_of(requests[5], GetSessionRequest) + is_instance_of(requests[6], BeginTransactionRequest) + is_instance_of(requests[7], ExecuteSqlRequest) + is_instance_of(requests[8], ExecuteSqlRequest) + # Verify that the transaction is a read-only transaction. + for index in [2, 6]: + begin_request: BeginTransactionRequest = requests[index] + eq_( + TransactionOptions( + dict( + read_only=TransactionOptions.ReadOnly( + dict( + read_timestamp={"seconds": 1733328910}, + return_read_timestamp=True, + ) + ) + ) + ), + begin_request.options, + ) + + def test_stale_read_single_use(self): + from test.mockserver_tests.stale_read_model import Singer + + add_singer_query_result("SELECT singers.id, singers.name\n" + "FROM singers") + engine = create_engine( + "spanner:///projects/p/instances/i/databases/d", + echo=True, + connect_args={"client": self.client, "pool": FixedSizePool(size=10)}, + ) + + with Session( + engine.execution_options( + isolation_level="AUTOCOMMIT", + staleness={"max_staleness": {"seconds": 15}}, + ) + ) as session: + # Execute two queries in autocommit. + session.scalars(select(Singer)).all() + session.scalars(select(Singer)).all() + + # Verify the requests that we got. + requests = self.spanner_service.requests + eq_(5, len(requests)) + is_instance_of(requests[0], BatchCreateSessionsRequest) + # We should get rid of this extra round-trip for GetSession.... + is_instance_of(requests[1], GetSessionRequest) + is_instance_of(requests[2], ExecuteSqlRequest) + is_instance_of(requests[3], GetSessionRequest) + is_instance_of(requests[4], ExecuteSqlRequest) + # Verify that the requests use a stale read. + for index in [2, 4]: + execute_request: ExecuteSqlRequest = requests[index] + eq_( + TransactionOptions( + dict( + read_only=TransactionOptions.ReadOnly( + dict( + max_staleness={"seconds": 15}, + return_read_timestamp=True, + ) + ) + ) + ), + execute_request.transaction.single_use, + ) + + +def add_singer_query_result(sql: str): + result = result_set.ResultSet( + dict( + metadata=result_set.ResultSetMetadata( + dict( + row_type=spanner_type.StructType( + dict( + fields=[ + spanner_type.StructType.Field( + dict( + name="singers_id", + type=spanner_type.Type( + dict(code=spanner_type.TypeCode.INT64) + ), + ) + ), + spanner_type.StructType.Field( + dict( + name="singers_name", + type=spanner_type.Type( + dict(code=spanner_type.TypeCode.STRING) + ), + ) + ), + ] + ) + ) + ) + ), + ) + ) + result.rows.extend( + [ + ( + "1", + "Jane Doe", + ), + ( + "2", + "John Doe", + ), + ] + ) + add_result(sql, result)