Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sqlalchemy): Optimized upsert_many operation #2254

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions litestar/contrib/sqlalchemy/operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from sqlalchemy import ClauseElement, ColumnElement, UpdateBase
from sqlalchemy.ext.compiler import compiles

if TYPE_CHECKING:
from typing import Literal, Self

from sqlalchemy.sql.compiler import StrSQLCompiler


class MergeClause(ClauseElement):
__visit_name__ = "merge_into_clause"

def __init__(self, command: Literal["INSERT", "UPDATE", "DELETE"]) -> None:
self.on_sets: dict[str, ColumnElement[Any]] = {}
self.predicate: ColumnElement[bool] | None = None
self.command = command

def values(self, **kwargs: ColumnElement[Any]) -> Self:
self.on_sets = kwargs
return self

def where(self, expr: ColumnElement[bool]) -> Self:
self.predicate = expr
return self


@compiles(MergeClause) # type: ignore[no-untyped-call, misc]
def visit_merge_into_clause(element: MergeClause, compiler: StrSQLCompiler, **kw: Any) -> str:
case_predicate = ""
if element.predicate is not None:
case_predicate = f" AND {element.predicate._compiler_dispatch(compiler, **kw)!s}"

if element.command == "INSERT":
sets, sets_tos = list(element.on_sets), list(element.on_sets.values())
if kw.get("deterministic", False):
sorted_on_sets = dict(sorted(element.on_sets.items(), key=lambda x: x[0]))
sets, sets_tos = list(sorted_on_sets), list(sorted_on_sets.values())

merge_insert = ", ".join(sets)
values = ", ".join(e._compiler_dispatch(compiler, **kw) for e in sets_tos)
return f"WHEN NOT MATCHED{case_predicate} THEN {element.command} ({merge_insert}) VALUES ({values})"

set_list = list(element.on_sets.items())
if kw.get("deterministic", False):
set_list.sort(key=lambda x: x[0])

# merge update or merge delete
merge_action = ""
values = ""

if element.on_sets:
values = ", ".join(f"{name} = {column._compiler_dispatch(compiler, **kw)}" for name, column in set_list)
merge_action = f" SET {values}"

return f"WHEN MATCHED{case_predicate} THEN {element.command}{merge_action}"


class MergeInto(UpdateBase):
__visit_name__ = "merge_into"
_bind = None
inherit_cache = True

def __init__(self, target: Any, source: Any, on: Any) -> None:
self.target = target
self.source = source
self.on = on
self.clauses: list[ClauseElement] = []

def when_matched_then_update(self) -> MergeClause:
self.clauses.append(clause := MergeClause("UPDATE"))
return clause

def when_matched_then_delete(self) -> MergeClause:
self.clauses.append(clause := MergeClause("DELETE"))
return clause

def when_not_matched_then_insert(self) -> MergeClause:
self.clauses.append(clause := MergeClause("INSERT"))
return clause


@compiles(MergeInto) # type: ignore[no-untyped-call, misc]
def visit_merge_into(element: MergeInto, compiler: StrSQLCompiler, **kw: Any) -> str:
clauses = " ".join(clause._compiler_dispatch(compiler, **kw) for clause in element.clauses)
sql_text = f"MERGE INTO {element.target} USING {element.source} ON {element.on}"

if clauses:
sql_text += f" {clauses}"

return sql_text
9 changes: 9 additions & 0 deletions litestar/contrib/sqlalchemy/repository/_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,15 @@ async def upsert_many(
NotFoundError: If no instance found with same identifier as ``data``.
"""
instances = []
_operation = "BASIC"
is_postgres_15 = self._dialect.name == "postgresql" and (
self._dialect.server_version_info is not None and self._dialect.server_version_info[0] >= 15
)
if self._dialect.name == "oracle" or (self._dialect.name == "postgresql" and is_postgres_15):
_operation = "MERGE"
elif self._dialect.name in {"sqlite", "duckdb", "mysql", "postgresql"}:
_operation = "INSERT_ON_EXCEPTION"

with wrap_sqlalchemy_exception():
for datum in data:
instance = await self._attach_to_session(datum, strategy="merge")
Expand Down
9 changes: 9 additions & 0 deletions litestar/contrib/sqlalchemy/repository/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,15 @@ def upsert_many(
NotFoundError: If no instance found with same identifier as ``data``.
"""
instances = []
_operation = "BASIC"
is_postgres_15 = self._dialect.name == "postgresql" and (
self._dialect.server_version_info is not None and self._dialect.server_version_info[0] >= 15
)
if self._dialect.name == "oracle" or (self._dialect.name == "postgresql" and is_postgres_15):
_operation = "MERGE"
elif self._dialect.name in {"sqlite", "duckdb", "mysql", "postgresql"}:
_operation = "INSERT_ON_EXCEPTION"

with wrap_sqlalchemy_exception():
for datum in data:
instance = self._attach_to_session(datum, strategy="merge")
Expand Down
Loading