Skip to content

Commit

Permalink
Drop safir.database.create_sync_session
Browse files Browse the repository at this point in the history
Drop support for creating sync database sessions. This was added for
vo-cutouts, which used Dramatiq as the task queuing system, since
Dramatiq is sync-only. We're standardizing on arq and on async Python
in general, and this code will no longer have any known callers after
vo-cutouts has been updated.
  • Loading branch information
rra committed Jun 10, 2024
1 parent 58a6ec8 commit 1d71fd3
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 202 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20240610_113548_rra_DM_44758.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Backwards-incompatible changes

- Drop `safir.database.create_sync_session`. This was only used by services that used Dramatiq for task management, since Dramatiq is sync-only. Services based on Safir should switch to arq and use only async database connections.
45 changes: 0 additions & 45 deletions docs/user-guide/database.rst
Original file line number Diff line number Diff line change
Expand Up @@ -426,51 +426,6 @@ For example:
If the statement fails, it will be retried up to five times, waiting two seconds between attempts, before raising the underlying exception.
This is particularly useful for waiting for network or a database proxy to come up when a process has first started.

Creating a sync database session
--------------------------------

Although Safir is primarily intended to support asyncio applications, it may sometimes be necessary to write sync code that performs database operations.
One example would be `Dramatiq <https://dramatiq.io/>`__ workers.
This can be done with `~safir.database.create_sync_session`.

.. code-block:: python
from safir.database import create_sync_session
from .config import config
session = create_sync_session(config.database_url, config.database_password)
with session.begin():
# ... do something with the session ...
pass
Unlike `~safir.database.create_async_session`, `~safir.database.create_sync_session` handles creating the engine internally, since sync engines do not require any special shutdown measures.

As with :ref:`async database sessions <probing-db-connection>`, you can pass a `structlog`_ logger and a statement to perform a connection check on the database before returning the session:

.. code-block:: python
import structlog
from safir.database import create_sync_session
from sqlalchemy.future import select
from .config import config
from .schema import User
logger = structlog.get_logger(config.logger_name)
stmt = select(User)
session = create_sync_session(
config.database_url,
config.database_password,
logger,
statement=stmt,
)
Applications that use `~safir.database.create_sync_session` must declare a dependency on `psycopg2 <https://pypi.org/project/psycopg2/>`__ in their pip dependencies.
Safir itself does not depend on psycopg2, even with the ``db`` extra, since most applications that use Safir for database support will only need async sessions.

Setting an isolation level
--------------------------

Expand Down
125 changes: 20 additions & 105 deletions src/safir/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,18 @@
from __future__ import annotations

import asyncio
import time
from datetime import UTC, datetime, timedelta
from typing import overload
from urllib.parse import quote, urlparse

from pydantic import SecretStr
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.asyncio import (
AsyncEngine,
async_scoped_session,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.schema import CreateSchema
from sqlalchemy.sql.expression import Select
from sqlalchemy.sql.schema import MetaData
Expand All @@ -27,7 +24,6 @@
"DatabaseInitializationError",
"create_async_session",
"create_database_engine",
"create_sync_session",
"datetime_from_db",
"datetime_to_db",
"initialize_database",
Expand All @@ -38,9 +34,7 @@ class DatabaseInitializationError(Exception):
"""Database initialization failed."""


def _build_database_url(
url: str, password: str | SecretStr | None, *, is_async: bool
) -> str:
def _build_database_url(url: str, password: str | SecretStr | None) -> str:
"""Build the authenticated URL for the database.
Parameters
Expand All @@ -49,8 +43,6 @@ def _build_database_url(
Database connection URL, not including the password.
password
Database connection password.
is_async
Whether the resulting URL should be async or not.
Returns
-------
Expand All @@ -62,26 +54,24 @@ def _build_database_url(
ValueError
A password was provided but the connection URL has no username.
"""
if is_async or password:
parsed_url = urlparse(url)
if is_async and parsed_url.scheme == "postgresql":
parsed_url = parsed_url._replace(scheme="postgresql+asyncpg")
if password:
if isinstance(password, SecretStr):
password = password.get_secret_value()
if not parsed_url.username:
raise ValueError(f"No username in database URL {url}")
password = quote(password, safe="")

# The username portion of the parsed URL does not appear to decode
# URL escaping of the username, so we should not quote it again or
# we will get double-quoting.
netloc = f"{parsed_url.username}:{password}@{parsed_url.hostname}"
if parsed_url.port:
netloc = f"{netloc}:{parsed_url.port}"
parsed_url = parsed_url._replace(netloc=netloc)
url = parsed_url.geturl()
return url
parsed_url = urlparse(url)
if parsed_url.scheme == "postgresql":
parsed_url = parsed_url._replace(scheme="postgresql+asyncpg")
if password:
if isinstance(password, SecretStr):
password = password.get_secret_value()
if not parsed_url.username:
raise ValueError(f"No username in database URL {url}")
password = quote(password, safe="")

# The username portion of the parsed URL does not appear to decode URL
# escaping of the username, so we should not quote it again or we will
# get double-quoting.
netloc = f"{parsed_url.username}:{password}@{parsed_url.hostname}"
if parsed_url.port:
netloc = f"{netloc}:{parsed_url.port}"
parsed_url = parsed_url._replace(netloc=netloc)
return parsed_url.geturl()


@overload
Expand Down Expand Up @@ -173,7 +163,7 @@ def create_database_engine(
ValueError
A password was provided but the connection URL has no username.
"""
url = _build_database_url(url, password, is_async=True)
url = _build_database_url(url, password)
if isolation_level:
return create_async_engine(
url, future=True, isolation_level=isolation_level
Expand Down Expand Up @@ -243,81 +233,6 @@ async def create_async_session(
return session


def create_sync_session(
url: str,
password: str | SecretStr | None,
logger: BoundLogger | None = None,
*,
isolation_level: str | None = None,
statement: Select | None = None,
) -> scoped_session:
"""Create a new sync database session.
Used instead of `create_database_engine` and `create_async_session` for
sync code, such as Dramatiq workers. This combines engine creation with
session creation.
Parameters
----------
url
Database connection URL, not including the password.
password
Database connection password.
logger
Logger for reporting errors. Used only if a statement is provided.
isolation_level
If specified, sets a non-default isolation level for the database
engine.
statement
If provided, statement to run to check database connectivity. This
will be modified with ``limit(1)`` before execution. If not provided,
database connectivity will not be checked.
Returns
-------
sqlalchemy.orm.scoping.scoped_session
The database session proxy. This manages a separate session per
thread and therefore should be thread-safe.
Raises
------
ValueError
A password was provided but the connection URL has no username.
"""
url = _build_database_url(url, password, is_async=False)
if isolation_level:
engine = create_engine(url, isolation_level=isolation_level)
else:
engine = create_engine(url)
factory = sessionmaker(bind=engine, future=True)
session = scoped_session(factory)

# If no statement was provided, just return the scoped_session.
if statement is None:
return session

# A statement was provided, so we want to check connectivity and retry for
# up to ten seconds before returning the session.
for _ in range(5):
try:
with session.begin():
session.execute(statement.limit(1))
return session
except (ConnectionRefusedError, OperationalError, OSError):
if logger:
logger.info("database not ready, waiting two seconds")
session.remove()
time.sleep(2)
continue

# If we got here, we failed five times. Try one last time without
# catching exceptions so that we raise the appropriate exception to our
# caller.
with session.begin():
session.execute(statement.limit(1))
return session


async def initialize_database(
engine: AsyncEngine,
logger: BoundLogger,
Expand Down
58 changes: 6 additions & 52 deletions tests/database_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
_build_database_url,
create_async_session,
create_database_engine,
create_sync_session,
datetime_from_db,
datetime_to_db,
initialize_database,
Expand Down Expand Up @@ -69,43 +68,28 @@ async def test_database_init(database_url: str) -> None:


def test_build_database_url(database_url: str) -> None:
url = _build_database_url(database_url, None, is_async=False)
assert url == database_url

url = _build_database_url(
"postgresql://[email protected]/foo", "password", is_async=False
)
assert url == "postgresql://foo:[email protected]/foo"

url = _build_database_url(
"postgresql://[email protected]/foo", None, is_async=True
)
url = _build_database_url("postgresql://[email protected]/foo", None)
assert url == "postgresql+asyncpg://[email protected]/foo"

url = _build_database_url(
"postgresql://[email protected]:5432/foo", None, is_async=True
)
url = _build_database_url("postgresql://[email protected]:5432/foo", None)
assert url == "postgresql+asyncpg://[email protected]:5432/foo"

url = _build_database_url(
"postgresql://[email protected]/foo", "otherpass", is_async=True
)
url = _build_database_url("postgresql://[email protected]/foo", "otherpass")
assert url == "postgresql+asyncpg://foo:[email protected]/foo"

url = _build_database_url(
"postgresql://[email protected]:5433/foo", "otherpass", is_async=True
"postgresql://[email protected]:5433/foo", "otherpass"
)
assert url == "postgresql+asyncpg://foo:[email protected]:5433/foo"

# Test that the username and password are quoted properly.
url = _build_database_url(
"postgresql://foo%[email protected]:4444/foo",
"pass@word/with stuff",
is_async=False,
)
assert url == (
"postgresql://foo%40e.com:pass%40word%2Fwith%20stuff@127.0.0.1:4444"
"/foo"
"postgresql+asyncpg://foo%40e.com:pass%40word%2Fwith%20stuff"
"@127.0.0.1:4444/foo"
)
parsed_url = urlparse(url)
assert parsed_url.username
Expand Down Expand Up @@ -142,36 +126,6 @@ async def test_create_async_session(database_url: str) -> None:
await engine.dispose()


@pytest.mark.asyncio
async def test_create_sync_session(database_url: str) -> None:
logger = structlog.get_logger(__name__)
engine = create_database_engine(database_url, TEST_DATABASE_PASSWORD)
await initialize_database(engine, logger, schema=Base.metadata, reset=True)
await engine.dispose()

session = create_sync_session(
database_url,
TEST_DATABASE_PASSWORD,
logger,
statement=select(User),
)
with session.begin():
session.add(User(username="foo"))
session.remove()

# Use a query against a non-existent table as the liveness check and
# ensure that fails.
metadata = MetaData()
bad_table = Table("bad", metadata, Column("name", String(64)))
with pytest.raises(ProgrammingError):
session = create_sync_session(
database_url,
TEST_DATABASE_PASSWORD,
logger,
statement=select(bad_table),
)


def test_datetime() -> None:
tz_aware = datetime.now(tz=UTC)
tz_naive = tz_aware.replace(tzinfo=None)
Expand Down

0 comments on commit 1d71fd3

Please sign in to comment.