From 1d71fd337bda3513c3dc972ae31a732dc22a5161 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Mon, 10 Jun 2024 11:38:13 -0700 Subject: [PATCH] Drop safir.database.create_sync_session Drop support for creating sync database sessions. This was added for vo-cutouts, which used Dramatiq as the task queuing system, since Dramatiq is sync-only. We're standardizing on arq and on async Python in general, and this code will no longer have any known callers after vo-cutouts has been updated. --- changelog.d/20240610_113548_rra_DM_44758.md | 3 + docs/user-guide/database.rst | 45 ------- src/safir/database.py | 125 ++++---------------- tests/database_test.py | 58 +-------- 4 files changed, 29 insertions(+), 202 deletions(-) create mode 100644 changelog.d/20240610_113548_rra_DM_44758.md diff --git a/changelog.d/20240610_113548_rra_DM_44758.md b/changelog.d/20240610_113548_rra_DM_44758.md new file mode 100644 index 00000000..b9669e01 --- /dev/null +++ b/changelog.d/20240610_113548_rra_DM_44758.md @@ -0,0 +1,3 @@ +### Backwards-incompatible changes + +- Drop `safir.database.create_sync_session`. This was only used by services that used Dramatiq for task management, since Dramatiq is sync-only. Services based on Safir should switch to arq and use only async database connections. diff --git a/docs/user-guide/database.rst b/docs/user-guide/database.rst index 6a837e62..6fffd1cb 100644 --- a/docs/user-guide/database.rst +++ b/docs/user-guide/database.rst @@ -426,51 +426,6 @@ For example: If the statement fails, it will be retried up to five times, waiting two seconds between attempts, before raising the underlying exception. This is particularly useful for waiting for network or a database proxy to come up when a process has first started. -Creating a sync database session --------------------------------- - -Although Safir is primarily intended to support asyncio applications, it may sometimes be necessary to write sync code that performs database operations. -One example would be `Dramatiq `__ workers. -This can be done with `~safir.database.create_sync_session`. - -.. code-block:: python - - from safir.database import create_sync_session - - from .config import config - - - session = create_sync_session(config.database_url, config.database_password) - with session.begin(): - # ... do something with the session ... - pass - -Unlike `~safir.database.create_async_session`, `~safir.database.create_sync_session` handles creating the engine internally, since sync engines do not require any special shutdown measures. - -As with :ref:`async database sessions `, you can pass a `structlog`_ logger and a statement to perform a connection check on the database before returning the session: - -.. code-block:: python - - import structlog - from safir.database import create_sync_session - from sqlalchemy.future import select - - from .config import config - from .schema import User - - - logger = structlog.get_logger(config.logger_name) - stmt = select(User) - session = create_sync_session( - config.database_url, - config.database_password, - logger, - statement=stmt, - ) - -Applications that use `~safir.database.create_sync_session` must declare a dependency on `psycopg2 `__ in their pip dependencies. -Safir itself does not depend on psycopg2, even with the ``db`` extra, since most applications that use Safir for database support will only need async sessions. - Setting an isolation level -------------------------- diff --git a/src/safir/database.py b/src/safir/database.py index db1111a6..d5aade53 100644 --- a/src/safir/database.py +++ b/src/safir/database.py @@ -3,13 +3,11 @@ from __future__ import annotations import asyncio -import time from datetime import UTC, datetime, timedelta from typing import overload from urllib.parse import quote, urlparse from pydantic import SecretStr -from sqlalchemy import create_engine from sqlalchemy.exc import OperationalError from sqlalchemy.ext.asyncio import ( AsyncEngine, @@ -17,7 +15,6 @@ async_sessionmaker, create_async_engine, ) -from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.schema import CreateSchema from sqlalchemy.sql.expression import Select from sqlalchemy.sql.schema import MetaData @@ -27,7 +24,6 @@ "DatabaseInitializationError", "create_async_session", "create_database_engine", - "create_sync_session", "datetime_from_db", "datetime_to_db", "initialize_database", @@ -38,9 +34,7 @@ class DatabaseInitializationError(Exception): """Database initialization failed.""" -def _build_database_url( - url: str, password: str | SecretStr | None, *, is_async: bool -) -> str: +def _build_database_url(url: str, password: str | SecretStr | None) -> str: """Build the authenticated URL for the database. Parameters @@ -49,8 +43,6 @@ def _build_database_url( Database connection URL, not including the password. password Database connection password. - is_async - Whether the resulting URL should be async or not. Returns ------- @@ -62,26 +54,24 @@ def _build_database_url( ValueError A password was provided but the connection URL has no username. """ - if is_async or password: - parsed_url = urlparse(url) - if is_async and parsed_url.scheme == "postgresql": - parsed_url = parsed_url._replace(scheme="postgresql+asyncpg") - if password: - if isinstance(password, SecretStr): - password = password.get_secret_value() - if not parsed_url.username: - raise ValueError(f"No username in database URL {url}") - password = quote(password, safe="") - - # The username portion of the parsed URL does not appear to decode - # URL escaping of the username, so we should not quote it again or - # we will get double-quoting. - netloc = f"{parsed_url.username}:{password}@{parsed_url.hostname}" - if parsed_url.port: - netloc = f"{netloc}:{parsed_url.port}" - parsed_url = parsed_url._replace(netloc=netloc) - url = parsed_url.geturl() - return url + parsed_url = urlparse(url) + if parsed_url.scheme == "postgresql": + parsed_url = parsed_url._replace(scheme="postgresql+asyncpg") + if password: + if isinstance(password, SecretStr): + password = password.get_secret_value() + if not parsed_url.username: + raise ValueError(f"No username in database URL {url}") + password = quote(password, safe="") + + # The username portion of the parsed URL does not appear to decode URL + # escaping of the username, so we should not quote it again or we will + # get double-quoting. + netloc = f"{parsed_url.username}:{password}@{parsed_url.hostname}" + if parsed_url.port: + netloc = f"{netloc}:{parsed_url.port}" + parsed_url = parsed_url._replace(netloc=netloc) + return parsed_url.geturl() @overload @@ -173,7 +163,7 @@ def create_database_engine( ValueError A password was provided but the connection URL has no username. """ - url = _build_database_url(url, password, is_async=True) + url = _build_database_url(url, password) if isolation_level: return create_async_engine( url, future=True, isolation_level=isolation_level @@ -243,81 +233,6 @@ async def create_async_session( return session -def create_sync_session( - url: str, - password: str | SecretStr | None, - logger: BoundLogger | None = None, - *, - isolation_level: str | None = None, - statement: Select | None = None, -) -> scoped_session: - """Create a new sync database session. - - Used instead of `create_database_engine` and `create_async_session` for - sync code, such as Dramatiq workers. This combines engine creation with - session creation. - - Parameters - ---------- - url - Database connection URL, not including the password. - password - Database connection password. - logger - Logger for reporting errors. Used only if a statement is provided. - isolation_level - If specified, sets a non-default isolation level for the database - engine. - statement - If provided, statement to run to check database connectivity. This - will be modified with ``limit(1)`` before execution. If not provided, - database connectivity will not be checked. - - Returns - ------- - sqlalchemy.orm.scoping.scoped_session - The database session proxy. This manages a separate session per - thread and therefore should be thread-safe. - - Raises - ------ - ValueError - A password was provided but the connection URL has no username. - """ - url = _build_database_url(url, password, is_async=False) - if isolation_level: - engine = create_engine(url, isolation_level=isolation_level) - else: - engine = create_engine(url) - factory = sessionmaker(bind=engine, future=True) - session = scoped_session(factory) - - # If no statement was provided, just return the scoped_session. - if statement is None: - return session - - # A statement was provided, so we want to check connectivity and retry for - # up to ten seconds before returning the session. - for _ in range(5): - try: - with session.begin(): - session.execute(statement.limit(1)) - return session - except (ConnectionRefusedError, OperationalError, OSError): - if logger: - logger.info("database not ready, waiting two seconds") - session.remove() - time.sleep(2) - continue - - # If we got here, we failed five times. Try one last time without - # catching exceptions so that we raise the appropriate exception to our - # caller. - with session.begin(): - session.execute(statement.limit(1)) - return session - - async def initialize_database( engine: AsyncEngine, logger: BoundLogger, diff --git a/tests/database_test.py b/tests/database_test.py index 0f350684..7fec9fbd 100644 --- a/tests/database_test.py +++ b/tests/database_test.py @@ -18,7 +18,6 @@ _build_database_url, create_async_session, create_database_engine, - create_sync_session, datetime_from_db, datetime_to_db, initialize_database, @@ -69,31 +68,17 @@ async def test_database_init(database_url: str) -> None: def test_build_database_url(database_url: str) -> None: - url = _build_database_url(database_url, None, is_async=False) - assert url == database_url - - url = _build_database_url( - "postgresql://foo@127.0.0.1/foo", "password", is_async=False - ) - assert url == "postgresql://foo:password@127.0.0.1/foo" - - url = _build_database_url( - "postgresql://foo@127.0.0.1/foo", None, is_async=True - ) + url = _build_database_url("postgresql://foo@127.0.0.1/foo", None) assert url == "postgresql+asyncpg://foo@127.0.0.1/foo" - url = _build_database_url( - "postgresql://foo@127.0.0.1:5432/foo", None, is_async=True - ) + url = _build_database_url("postgresql://foo@127.0.0.1:5432/foo", None) assert url == "postgresql+asyncpg://foo@127.0.0.1:5432/foo" - url = _build_database_url( - "postgresql://foo@127.0.0.1/foo", "otherpass", is_async=True - ) + url = _build_database_url("postgresql://foo@127.0.0.1/foo", "otherpass") assert url == "postgresql+asyncpg://foo:otherpass@127.0.0.1/foo" url = _build_database_url( - "postgresql://foo@127.0.0.1:5433/foo", "otherpass", is_async=True + "postgresql://foo@127.0.0.1:5433/foo", "otherpass" ) assert url == "postgresql+asyncpg://foo:otherpass@127.0.0.1:5433/foo" @@ -101,11 +86,10 @@ def test_build_database_url(database_url: str) -> None: url = _build_database_url( "postgresql://foo%40e.com@127.0.0.1:4444/foo", "pass@word/with stuff", - is_async=False, ) assert url == ( - "postgresql://foo%40e.com:pass%40word%2Fwith%20stuff@127.0.0.1:4444" - "/foo" + "postgresql+asyncpg://foo%40e.com:pass%40word%2Fwith%20stuff" + "@127.0.0.1:4444/foo" ) parsed_url = urlparse(url) assert parsed_url.username @@ -142,36 +126,6 @@ async def test_create_async_session(database_url: str) -> None: await engine.dispose() -@pytest.mark.asyncio -async def test_create_sync_session(database_url: str) -> None: - logger = structlog.get_logger(__name__) - engine = create_database_engine(database_url, TEST_DATABASE_PASSWORD) - await initialize_database(engine, logger, schema=Base.metadata, reset=True) - await engine.dispose() - - session = create_sync_session( - database_url, - TEST_DATABASE_PASSWORD, - logger, - statement=select(User), - ) - with session.begin(): - session.add(User(username="foo")) - session.remove() - - # Use a query against a non-existent table as the liveness check and - # ensure that fails. - metadata = MetaData() - bad_table = Table("bad", metadata, Column("name", String(64))) - with pytest.raises(ProgrammingError): - session = create_sync_session( - database_url, - TEST_DATABASE_PASSWORD, - logger, - statement=select(bad_table), - ) - - def test_datetime() -> None: tz_aware = datetime.now(tz=UTC) tz_naive = tz_aware.replace(tzinfo=None)