diff --git a/changelog.d/20240610_113548_rra_DM_44758.md b/changelog.d/20240610_113548_rra_DM_44758.md
new file mode 100644
index 00000000..b9669e01
--- /dev/null
+++ b/changelog.d/20240610_113548_rra_DM_44758.md
@@ -0,0 +1,3 @@
+### Backwards-incompatible changes
+
+- Drop `safir.database.create_sync_session`. This was only used by services that used Dramatiq for task management, since Dramatiq is sync-only. Services based on Safir should switch to arq and use only async database connections.
diff --git a/docs/user-guide/database.rst b/docs/user-guide/database.rst
index 6a837e62..7b0c6e3a 100644
--- a/docs/user-guide/database.rst
+++ b/docs/user-guide/database.rst
@@ -426,55 +426,10 @@ For example:
If the statement fails, it will be retried up to five times, waiting two seconds between attempts, before raising the underlying exception.
This is particularly useful for waiting for network or a database proxy to come up when a process has first started.
-Creating a sync database session
---------------------------------
-
-Although Safir is primarily intended to support asyncio applications, it may sometimes be necessary to write sync code that performs database operations.
-One example would be `Dramatiq `__ workers.
-This can be done with `~safir.database.create_sync_session`.
-
-.. code-block:: python
-
- from safir.database import create_sync_session
-
- from .config import config
-
-
- session = create_sync_session(config.database_url, config.database_password)
- with session.begin():
- # ... do something with the session ...
- pass
-
-Unlike `~safir.database.create_async_session`, `~safir.database.create_sync_session` handles creating the engine internally, since sync engines do not require any special shutdown measures.
-
-As with :ref:`async database sessions `, you can pass a `structlog`_ logger and a statement to perform a connection check on the database before returning the session:
-
-.. code-block:: python
-
- import structlog
- from safir.database import create_sync_session
- from sqlalchemy.future import select
-
- from .config import config
- from .schema import User
-
-
- logger = structlog.get_logger(config.logger_name)
- stmt = select(User)
- session = create_sync_session(
- config.database_url,
- config.database_password,
- logger,
- statement=stmt,
- )
-
-Applications that use `~safir.database.create_sync_session` must declare a dependency on `psycopg2 `__ in their pip dependencies.
-Safir itself does not depend on psycopg2, even with the ``db`` extra, since most applications that use Safir for database support will only need async sessions.
-
Setting an isolation level
--------------------------
-`~safir.database.create_database_engine`, `~safir.database.create_sync_session`, and the ``initialize`` method of `~safir.dependencies.db_session.db_session_dependency` take an optional ``isolation_level`` argument that can be used to set a non-default isolation level.
+`~safir.database.create_database_engine` and the ``initialize`` method of `~safir.dependencies.db_session.db_session_dependency` take an optional ``isolation_level`` argument that can be used to set a non-default isolation level.
If given, this parameter is passed through to the underlying SQLAlchemy engine.
See `the SQLAlchemy isolation level documentation `__ for more information.
diff --git a/src/safir/database.py b/src/safir/database.py
index db1111a6..d5aade53 100644
--- a/src/safir/database.py
+++ b/src/safir/database.py
@@ -3,13 +3,11 @@
from __future__ import annotations
import asyncio
-import time
from datetime import UTC, datetime, timedelta
from typing import overload
from urllib.parse import quote, urlparse
from pydantic import SecretStr
-from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.asyncio import (
AsyncEngine,
@@ -17,7 +15,6 @@
async_sessionmaker,
create_async_engine,
)
-from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.schema import CreateSchema
from sqlalchemy.sql.expression import Select
from sqlalchemy.sql.schema import MetaData
@@ -27,7 +24,6 @@
"DatabaseInitializationError",
"create_async_session",
"create_database_engine",
- "create_sync_session",
"datetime_from_db",
"datetime_to_db",
"initialize_database",
@@ -38,9 +34,7 @@ class DatabaseInitializationError(Exception):
"""Database initialization failed."""
-def _build_database_url(
- url: str, password: str | SecretStr | None, *, is_async: bool
-) -> str:
+def _build_database_url(url: str, password: str | SecretStr | None) -> str:
"""Build the authenticated URL for the database.
Parameters
@@ -49,8 +43,6 @@ def _build_database_url(
Database connection URL, not including the password.
password
Database connection password.
- is_async
- Whether the resulting URL should be async or not.
Returns
-------
@@ -62,26 +54,24 @@ def _build_database_url(
ValueError
A password was provided but the connection URL has no username.
"""
- if is_async or password:
- parsed_url = urlparse(url)
- if is_async and parsed_url.scheme == "postgresql":
- parsed_url = parsed_url._replace(scheme="postgresql+asyncpg")
- if password:
- if isinstance(password, SecretStr):
- password = password.get_secret_value()
- if not parsed_url.username:
- raise ValueError(f"No username in database URL {url}")
- password = quote(password, safe="")
-
- # The username portion of the parsed URL does not appear to decode
- # URL escaping of the username, so we should not quote it again or
- # we will get double-quoting.
- netloc = f"{parsed_url.username}:{password}@{parsed_url.hostname}"
- if parsed_url.port:
- netloc = f"{netloc}:{parsed_url.port}"
- parsed_url = parsed_url._replace(netloc=netloc)
- url = parsed_url.geturl()
- return url
+ parsed_url = urlparse(url)
+ if parsed_url.scheme == "postgresql":
+ parsed_url = parsed_url._replace(scheme="postgresql+asyncpg")
+ if password:
+ if isinstance(password, SecretStr):
+ password = password.get_secret_value()
+ if not parsed_url.username:
+ raise ValueError(f"No username in database URL {url}")
+ password = quote(password, safe="")
+
+ # The username portion of the parsed URL does not appear to decode URL
+ # escaping of the username, so we should not quote it again or we will
+ # get double-quoting.
+ netloc = f"{parsed_url.username}:{password}@{parsed_url.hostname}"
+ if parsed_url.port:
+ netloc = f"{netloc}:{parsed_url.port}"
+ parsed_url = parsed_url._replace(netloc=netloc)
+ return parsed_url.geturl()
@overload
@@ -173,7 +163,7 @@ def create_database_engine(
ValueError
A password was provided but the connection URL has no username.
"""
- url = _build_database_url(url, password, is_async=True)
+ url = _build_database_url(url, password)
if isolation_level:
return create_async_engine(
url, future=True, isolation_level=isolation_level
@@ -243,81 +233,6 @@ async def create_async_session(
return session
-def create_sync_session(
- url: str,
- password: str | SecretStr | None,
- logger: BoundLogger | None = None,
- *,
- isolation_level: str | None = None,
- statement: Select | None = None,
-) -> scoped_session:
- """Create a new sync database session.
-
- Used instead of `create_database_engine` and `create_async_session` for
- sync code, such as Dramatiq workers. This combines engine creation with
- session creation.
-
- Parameters
- ----------
- url
- Database connection URL, not including the password.
- password
- Database connection password.
- logger
- Logger for reporting errors. Used only if a statement is provided.
- isolation_level
- If specified, sets a non-default isolation level for the database
- engine.
- statement
- If provided, statement to run to check database connectivity. This
- will be modified with ``limit(1)`` before execution. If not provided,
- database connectivity will not be checked.
-
- Returns
- -------
- sqlalchemy.orm.scoping.scoped_session
- The database session proxy. This manages a separate session per
- thread and therefore should be thread-safe.
-
- Raises
- ------
- ValueError
- A password was provided but the connection URL has no username.
- """
- url = _build_database_url(url, password, is_async=False)
- if isolation_level:
- engine = create_engine(url, isolation_level=isolation_level)
- else:
- engine = create_engine(url)
- factory = sessionmaker(bind=engine, future=True)
- session = scoped_session(factory)
-
- # If no statement was provided, just return the scoped_session.
- if statement is None:
- return session
-
- # A statement was provided, so we want to check connectivity and retry for
- # up to ten seconds before returning the session.
- for _ in range(5):
- try:
- with session.begin():
- session.execute(statement.limit(1))
- return session
- except (ConnectionRefusedError, OperationalError, OSError):
- if logger:
- logger.info("database not ready, waiting two seconds")
- session.remove()
- time.sleep(2)
- continue
-
- # If we got here, we failed five times. Try one last time without
- # catching exceptions so that we raise the appropriate exception to our
- # caller.
- with session.begin():
- session.execute(statement.limit(1))
- return session
-
-
async def initialize_database(
engine: AsyncEngine,
logger: BoundLogger,
diff --git a/tests/database_test.py b/tests/database_test.py
index 0f350684..7fec9fbd 100644
--- a/tests/database_test.py
+++ b/tests/database_test.py
@@ -18,7 +18,6 @@
_build_database_url,
create_async_session,
create_database_engine,
- create_sync_session,
datetime_from_db,
datetime_to_db,
initialize_database,
@@ -69,31 +68,17 @@ async def test_database_init(database_url: str) -> None:
def test_build_database_url(database_url: str) -> None:
- url = _build_database_url(database_url, None, is_async=False)
- assert url == database_url
-
- url = _build_database_url(
- "postgresql://foo@127.0.0.1/foo", "password", is_async=False
- )
- assert url == "postgresql://foo:password@127.0.0.1/foo"
-
- url = _build_database_url(
- "postgresql://foo@127.0.0.1/foo", None, is_async=True
- )
+ url = _build_database_url("postgresql://foo@127.0.0.1/foo", None)
assert url == "postgresql+asyncpg://foo@127.0.0.1/foo"
- url = _build_database_url(
- "postgresql://foo@127.0.0.1:5432/foo", None, is_async=True
- )
+ url = _build_database_url("postgresql://foo@127.0.0.1:5432/foo", None)
assert url == "postgresql+asyncpg://foo@127.0.0.1:5432/foo"
- url = _build_database_url(
- "postgresql://foo@127.0.0.1/foo", "otherpass", is_async=True
- )
+ url = _build_database_url("postgresql://foo@127.0.0.1/foo", "otherpass")
assert url == "postgresql+asyncpg://foo:otherpass@127.0.0.1/foo"
url = _build_database_url(
- "postgresql://foo@127.0.0.1:5433/foo", "otherpass", is_async=True
+ "postgresql://foo@127.0.0.1:5433/foo", "otherpass"
)
assert url == "postgresql+asyncpg://foo:otherpass@127.0.0.1:5433/foo"
@@ -101,11 +86,10 @@ def test_build_database_url(database_url: str) -> None:
url = _build_database_url(
"postgresql://foo%40e.com@127.0.0.1:4444/foo",
"pass@word/with stuff",
- is_async=False,
)
assert url == (
- "postgresql://foo%40e.com:pass%40word%2Fwith%20stuff@127.0.0.1:4444"
- "/foo"
+ "postgresql+asyncpg://foo%40e.com:pass%40word%2Fwith%20stuff"
+ "@127.0.0.1:4444/foo"
)
parsed_url = urlparse(url)
assert parsed_url.username
@@ -142,36 +126,6 @@ async def test_create_async_session(database_url: str) -> None:
await engine.dispose()
-@pytest.mark.asyncio
-async def test_create_sync_session(database_url: str) -> None:
- logger = structlog.get_logger(__name__)
- engine = create_database_engine(database_url, TEST_DATABASE_PASSWORD)
- await initialize_database(engine, logger, schema=Base.metadata, reset=True)
- await engine.dispose()
-
- session = create_sync_session(
- database_url,
- TEST_DATABASE_PASSWORD,
- logger,
- statement=select(User),
- )
- with session.begin():
- session.add(User(username="foo"))
- session.remove()
-
- # Use a query against a non-existent table as the liveness check and
- # ensure that fails.
- metadata = MetaData()
- bad_table = Table("bad", metadata, Column("name", String(64)))
- with pytest.raises(ProgrammingError):
- session = create_sync_session(
- database_url,
- TEST_DATABASE_PASSWORD,
- logger,
- statement=select(bad_table),
- )
-
-
def test_datetime() -> None:
tz_aware = datetime.now(tz=UTC)
tz_naive = tz_aware.replace(tzinfo=None)