I am tried to implement transactions in fastapi project. (routes -> Services layer -> Unit of Work -> Repositories)
This is my base UOW:
class BaseUnitOfWork(UnitOfWorkInterface):
def __init__(self, session_factory: Callable[[], AsyncSession]) -> None:
self._session_factory = session_factory
async def __aenter__(self) -> Self:
self.session = self._session_factory()
self._init_repos()
return self
@abstractmethod
def _init_repos(self) -> None:
"""
Usage:
- Inherit BaseUnitOfWork and define this method
- use method _register_repo to attach db session to you're repositories
Example:
from your_repositories import User, Profile
class MyUnitOfWork(BaseUnitOfWork):
def _init_repos(self):
self.users = self._register_repo(UserRepository)
self.profiles = self._register_repo(ProfileRepository)
"""
pass
def _register_repo(self, repo: type[R]) -> R:
return repo(self.session)
async def __aexit__(
self,
exception: type[BaseException] | None,
value: BaseException | None,
traceback: TracebackType | None,
) -> None:
if exception:
await self.session.rollback()
else:
await self.session.commit()
await self.session.close()
async def save(self):
await self.session.commit()
async def undo(self):
await self.session.rollback()
This is my app-level UOW:
class UnitOfWork(BaseUnitOfWork):
def _init_repos(self) -> None:
self.users = self._register_repo(UserDAL)
self.vocabularies = self._register_repo(VocabularySetDAL)
self.language_pairs = self._register_repo(LanguagePairDAL)
UOW is used as dependency in Service layer:
class UserService:
def __init__(self, uow: UnitOfWork) -> None:
self._uow = uow
async def get_or_create_by_id(self, id: int) -> User:
async with self._uow as uow:
user = await uow.users.get_or_create(id=id)
return user
async def get_by_id(self, id: int) -> User | None:
async with self._uow as uow:
user = await uow.users.get_by_id(id)
return user
And then I create services components via:
users_service = UserService(UnitOfWork(async_session_maker))
vocabularies_service = VocabularyService(UnitOfWork(async_session_maker))
And use it by importing services to my routes.
Then i call my regular api endpoints all works well, but if i open my starlette admin dashboard, I’ve receive error:
This session is provisioning a new connection; concurrent operations are not permitted
This occurs only when I open the edit page with model that need to load additional child models.
And further error displays when I terminate app:
ERROR:sqlalchemy.pool.impl.AsyncAdaptedQueuePool:The garbage collector is trying to clean up non-checked-in connection <AdaptedConnection <asyncpg.connection.Connection object at 0x112d2a020>>, which will be terminated. Please ensure that SQLAlchemy pooled connections are returned to the pool explicitly, either by calling ``close()`` or by using appropriate context managers to manage their lifecycle.
it seems the problem in close()
method I’ve called in my uow context manager (SQLAlchemy asyncio
) and sessions that uow creates may conflicts with starlete-admin sessions (Starlete anyio
) that creates under the hood.
I’ve tried to wrap my close()
method to shield
asyncio method:
async def __aexit__(
self,
exception: type[BaseException] | None,
value: BaseException | None,
traceback: TracebackType | None,
) -> None:
if exception:
await self.session.rollback()
else:
await self.session.commit()
await asyncio.shield(self.session.close())
But it not caused any changes.
The full message of exception:
ERROR: Exception in ASGI application
+ Exception Group Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 108, in __call__
| response = await self.dispatch_func(request, call_next)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette_admin/contrib/sqla/middleware.py", line 24, in dispatch
| return await call_next(request)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 84, in call_next
| raise app_exc
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 70, in coro
| await self.app(scope, receive_or_disconnect, send_no_error)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/sessions.py", line 86, in __call__
| await self.app(scope, receive, send_wrapper)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 106, in __call__
| async with anyio.create_task_group() as task_group:
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 108, in __call__
| response = await self.dispatch_func(request, call_next)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette_admin/auth.py", line 277, in dispatch
| await self.provider.is_authenticated(request)
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/admin/auth.py", line 53, in is_authenticated
| current_user = await AuthService.get_user_from_token(token)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/auth/auth.py", line 42, in get_user_from_token
| user = await users_service.get_by_id(user_id)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/users/services.py", line 18, in get_by_id
| async with self._uow as uow:
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/db/unitofwork.py", line 71, in __aexit__
| await self.session.commit()
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 1011, in commit
| await greenlet_spawn(self.sync_session.commit)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 179, in greenlet_spawn
| result = context.switch(*args, **kwargs)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1969, in commit
| trans.commit(_to_root=True)
| File "<string>", line 2, in commit
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/state_changes.py", line 103, in _go
| self._raise_for_prerequisite_state(fn.__name__, current_state)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 946, in _raise_for_prerequisite_state
| raise sa_exc.InvalidRequestError(
| sqlalchemy.exc.InvalidRequestError: This session is provisioning a new connection; concurrent operations are not permitted (Background on this error at: https://sqlalche.me/e/20/isce)
+------------------------------------
During handling of the above exception, another exception occurred:
+ Exception Group Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py", line 408, in run_asgi
| result = await app( # type: ignore[func-returns-value]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
| return await self.app(scope, receive, send)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/fastapi/applications.py", line 292, in __call__
| await super().__call__(scope, receive, send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
| raise exc
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
| await self.app(scope, receive, _send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
| raise exc
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
| await self.app(scope, receive, sender)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
| raise e
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
| await self.app(scope, receive, send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/routing.py", line 718, in __call__
| await route.handle(scope, receive, send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/routing.py", line 443, in handle
| await self.app(scope, receive, send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
| await self.middleware_stack(scope, receive, send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
| raise exc
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
| await self.app(scope, receive, _send)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 106, in __call__
| async with anyio.create_task_group() as task_group:
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/streams/memory.py", line 97, in receive
| return self.receive_nowait()
| ^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/streams/memory.py", line 92, in receive_nowait
| raise WouldBlock
| anyio.WouldBlock
|
| During handling of the above exception, another exception occurred:
|
| Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 78, in call_next
| message = await recv_stream.receive()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/streams/memory.py", line 112, in receive
| raise EndOfStream
| anyio.EndOfStream
|
| During handling of the above exception, another exception occurred:
|
| Exception Group Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 108, in __call__
| response = await self.dispatch_func(request, call_next)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette_admin/contrib/sqla/middleware.py", line 24, in dispatch
| return await call_next(request)
| ^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 84, in call_next
| raise app_exc
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 70, in coro
| await self.app(scope, receive_or_disconnect, send_no_error)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/sessions.py", line 86, in __call__
| await self.app(scope, receive, send_wrapper)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 106, in __call__
| async with anyio.create_task_group() as task_group:
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette/middleware/base.py", line 108, in __call__
| response = await self.dispatch_func(request, call_next)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/starlette_admin/auth.py", line 277, in dispatch
| await self.provider.is_authenticated(request)
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/admin/auth.py", line 53, in is_authenticated
| current_user = await AuthService.get_user_from_token(token)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/auth/auth.py", line 42, in get_user_from_token
| user = await users_service.get_by_id(user_id)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/users/services.py", line 18, in get_by_id
| async with self._uow as uow:
| File "/Users/apple/Desktop/proggraming/fastbot/app/backend/db/unitofwork.py", line 71, in __aexit__
| await self.session.commit()
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 1011, in commit
| await greenlet_spawn(self.sync_session.commit)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 179, in greenlet_spawn
| result = context.switch(*args, **kwargs)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1969, in commit
| trans.commit(_to_root=True)
| File "<string>", line 2, in commit
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/state_changes.py", line 103, in _go
| self._raise_for_prerequisite_state(fn.__name__, current_state)
| File "/Users/apple/Desktop/proggraming/fastbot/.venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 946, in _raise_for_prerequisite_state
| raise sa_exc.InvalidRequestError(
| sqlalchemy.exc.InvalidRequestError: This session is provisioning a new connection; concurrent operations are not permitted (Background on this error at: https://sqlalche.me/e/20/isce)