-
Notifications
You must be signed in to change notification settings - Fork 971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Chapter 6 Excercise for the Reader: Separate UoW and Context Manager #47
Comments
@chrisgrf, here's how I've implemented this pattern in a few projects at my work: service_layer/ports.py: import typing as t
class IUnitOfWorkContext(t.Protocol):
batches_repository: IBatchesRepository
products_repository: IProductsRepository
class IUnitOfWork(t.ContextManager[IUnitOfWorkContext], t.Protocol):
def commit(self) -> None:
"""
Commit unit of work
:raises UnitOfWorkNotStarted if called outside the context manager scope
"""
...
def rollback(self) -> None:
"""
Rollback unit of work
:raises UnitOfWorkNotStarted if called outside the context manager scope
"""
... Note: the protocol conveniently defines the base type This is the concrete implementation: class PostgresUnitOfWorkContext(IUnitOfWorkContext):
session: Session
def __init__(self, session: Session):
self.session = session
self.batches_repository = BatchesRepository(session)
self.products_repository = ProductsRepository(session)
class PostgresUnitOfWork(IUnitOfWork):
session_factory: sessionmaker
context: PostgresUnitOfWorkContext | None
def __init__(self, session_factory: sessionmaker):
self.session_factory = session_factory
self.context = None
def __enter__(self) -> "PostgresUnitOfWorkContext":
session = self.session_factory()
self.context = PostgresUnitOfWorkContext(session)
return self.context
def __exit__(
self,
exc_type: Type[BaseException] | None,
exc_val: BaseException | None, # noqa: F841
exc_tb: TracebackType | None, # noqa: F841
) -> None:
if not self.context:
raise UnitOfWorkNotStarted()
if exc_type is None:
# Commit on exit by default
self.commit()
else:
self.rollback()
self.context = None
def commit(self) -> None:
if not self.context:
raise UnitOfWorkNotStarted()
self.context.session.commit()
def rollback(self) -> None:
if not self.context:
raise UnitOfWorkNotStarted()
self.context.session.rollback() It can be used like this: def some_command_handler(uow: IUnitOfWork, product_id: int):
with uow as ctx:
product = ctx.product_repository.get_by_id(product_id)
... One of the reasons I implemented it like this was that it allows you to centralise the UoW responsibility within the message bus instead of in the command handler which allows you to execute multiple commands within the same UoW (bends the rules but this might be useful). So the definition of your command handlers becomes: def some_command_handler(ctx: IUnitOfWorkContext, product_id: int):
product = ctx.product_repository.get_by_id(product_id)
... However, I've changed this a lot since I started using this pattern. Here are some thoughts/notes:
Hope that is helpful! |
@gregbrowndev Thanks for a very detailed and interesting answer! However, our task was to separate the UoW form the context manager. How good is your version, given that the My solution looks like this (unit_of_work.py):
and its usage (services.py):
In my opinion, this solution most explicitly shows that UoW only has the necessary responsibilities. |
Could someone give a solution for this exercise?
Following is my code example. Would there be a point in splitting
FakeUnitOfWork
?src/allocation/service_layer/unit_of_work.py
The text was updated successfully, but these errors were encountered: