Перейти к содержанию

Unit of Work (Единица работы)

Модуль uow реализует паттерн "Unit of Work" для управления транзакциями и доступа к репозиториям в сервисном слое приложения.

Особенности

  • Абстракция для управления транзакциями
  • Поддержка асинхронных операций
  • Интеграция с SQLAlchemy
  • Автоматическое управление жизненным циклом сессии
  • Поддержка отката изменений

Документация API

AbstractUnitOfWork

Абстрактный базовый класс для реализации паттерна Unit of Work.

Bases: ABC

Абстрактный базовый класс Unit of Work (Единица работы).

Предоставляет интерфейс для управления транзакциями и доступа к репозиториям.

Атрибуты

users: Репозиторий для работы с пользователями.

Source code in src/service_layer/uow.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class AbstractUnitOfWork(abc.ABC):
    """Абстрактный базовый класс Unit of Work (Единица работы).

    Предоставляет интерфейс для управления транзакциями и доступа к репозиториям.

    Атрибуты:
        users: Репозиторий для работы с пользователями.
    """

    users: ABCUsersRepository

    async def __aenter__(self) -> 'AbstractUnitOfWork':
        """Вход в контекстный менеджер.

        Returns:
            AbstractUnitOfWork: Текущий экземпляр Unit of Work.

        """
        return self

    async def __aexit__(self, *args) -> None:
        """Выход из контекстного менеджера с откатом несохраненных изменений.

        Args:
            *args: Аргументы исключения, если оно произошло в блоке with.

        """
        await self.rollback()

    async def commit(self) -> None:
        """Фиксирует все изменения в рамках текущей единицы работы.

        Raises:
            Exception: Если произошла ошибка при фиксации изменений.

        """
        await self._commit()

    @abc.abstractmethod
    async def _commit(self) -> None:
        """Абстрактный метод для реализации фиксации изменений.

        Должен быть переопределен в подклассах для конкретной реализации.

        Raises:
            NotImplementedError: Если метод не переопределен в подклассе.

        """
        raise NotImplementedError

    @abc.abstractmethod
    async def rollback(self) -> None:
        """Абстрактный метод для отката изменений.

        Должен быть переопределен в подклассах для конкретной реализации.

        Raises:
            NotImplementedError: Если метод не переопределен в подклассе.

        """
        raise NotImplementedError

__aenter__() -> AbstractUnitOfWork async

Вход в контекстный менеджер.

Returns:

Name Type Description
AbstractUnitOfWork AbstractUnitOfWork

Текущий экземпляр Unit of Work.

Source code in src/service_layer/uow.py
25
26
27
28
29
30
31
32
async def __aenter__(self) -> 'AbstractUnitOfWork':
    """Вход в контекстный менеджер.

    Returns:
        AbstractUnitOfWork: Текущий экземпляр Unit of Work.

    """
    return self

__aexit__(*args) -> None async

Выход из контекстного менеджера с откатом несохраненных изменений.

Parameters:

Name Type Description Default
*args

Аргументы исключения, если оно произошло в блоке with.

()
Source code in src/service_layer/uow.py
34
35
36
37
38
39
40
41
async def __aexit__(self, *args) -> None:
    """Выход из контекстного менеджера с откатом несохраненных изменений.

    Args:
        *args: Аргументы исключения, если оно произошло в блоке with.

    """
    await self.rollback()

commit() -> None async

Фиксирует все изменения в рамках текущей единицы работы.

Raises:

Type Description
Exception

Если произошла ошибка при фиксации изменений.

Source code in src/service_layer/uow.py
43
44
45
46
47
48
49
50
async def commit(self) -> None:
    """Фиксирует все изменения в рамках текущей единицы работы.

    Raises:
        Exception: Если произошла ошибка при фиксации изменений.

    """
    await self._commit()

rollback() -> None abstractmethod async

Абстрактный метод для отката изменений.

Должен быть переопределен в подклассах для конкретной реализации.

Raises:

Type Description
NotImplementedError

Если метод не переопределен в подклассе.

Source code in src/service_layer/uow.py
64
65
66
67
68
69
70
71
72
73
74
@abc.abstractmethod
async def rollback(self) -> None:
    """Абстрактный метод для отката изменений.

    Должен быть переопределен в подклассах для конкретной реализации.

    Raises:
        NotImplementedError: Если метод не переопределен в подклассе.

    """
    raise NotImplementedError

SqlAlchemyUnitOfWork

Реализация Unit of Work для работы с SQLAlchemy.

Bases: AbstractUnitOfWork

Unit of Work для работы с SQLAlchemy.

Обеспечивает управление сессиями базы данных и транзакциями с использованием SQLAlchemy.

Source code in src/service_layer/uow.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
    """Unit of Work для работы с SQLAlchemy.

    Обеспечивает управление сессиями базы данных и транзакциями
    с использованием SQLAlchemy.
    """

    def __init__(
        self,
        session_factory: async_sessionmaker[AsyncSession],
        repo_factory: ABCUsersRepositoryFactory,
    ) -> None:
        """Инициализация SqlAlchemyUnitOfWork.

        Args:
            session_factory: Фабрика для создания асинхронных сессий SQLAlchemy.
            repo_factory: Фадрика создания репозитория для работы с данными

        """
        self.session_factory = session_factory
        self.repo_factory = repo_factory

    async def __aenter__(self) -> 'AbstractUnitOfWork':
        """Вход в контекстный менеджер.

        Инициализирует сессию базы данных и репозитории.

        Returns:
            SqlAlchemyUnitOfWork: Текущий экземпляр Unit of Work.

        """
        self.session: AsyncSession = self.session_factory()
        self.users = self.repo_factory.create(self.session)
        return await super().__aenter__()

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Выход из контекстного менеджера.

        При возникновении исключения выполняет откат изменений,
        иначе фиксирует изменения. В любом случае закрывает сессию.

        Args:
            exc_type: Тип исключения, если оно произошло, иначе None.
            exc_val: Экземпляр исключения, если оно произошло, иначе None.
            exc_tb: Трассировка стека, если исключение произошло, иначе None.

        """
        if exc_type:
            await self.rollback()
        else:
            await self._commit()
        await self.session.close()

    async def _commit(self) -> None:
        """Фиксирует изменения в базе данных.

        Raises:
            SQLAlchemyError: Если произошла ошибка при фиксации транзакции.

        """
        await self.session.commit()

    async def rollback(self) -> None:
        """Выполняет откат текущей транзакции.

        Отменяет все изменения, сделанные в рамках текущей сессии.
        """
        await self.session.rollback()

__aenter__() -> AbstractUnitOfWork async

Вход в контекстный менеджер.

Инициализирует сессию базы данных и репозитории.

Returns:

Name Type Description
SqlAlchemyUnitOfWork AbstractUnitOfWork

Текущий экземпляр Unit of Work.

Source code in src/service_layer/uow.py
 99
100
101
102
103
104
105
106
107
108
109
110
async def __aenter__(self) -> 'AbstractUnitOfWork':
    """Вход в контекстный менеджер.

    Инициализирует сессию базы данных и репозитории.

    Returns:
        SqlAlchemyUnitOfWork: Текущий экземпляр Unit of Work.

    """
    self.session: AsyncSession = self.session_factory()
    self.users = self.repo_factory.create(self.session)
    return await super().__aenter__()

__aexit__(exc_type, exc_val, exc_tb) -> None async

Выход из контекстного менеджера.

При возникновении исключения выполняет откат изменений, иначе фиксирует изменения. В любом случае закрывает сессию.

Parameters:

Name Type Description Default
exc_type

Тип исключения, если оно произошло, иначе None.

required
exc_val

Экземпляр исключения, если оно произошло, иначе None.

required
exc_tb

Трассировка стека, если исключение произошло, иначе None.

required
Source code in src/service_layer/uow.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Выход из контекстного менеджера.

    При возникновении исключения выполняет откат изменений,
    иначе фиксирует изменения. В любом случае закрывает сессию.

    Args:
        exc_type: Тип исключения, если оно произошло, иначе None.
        exc_val: Экземпляр исключения, если оно произошло, иначе None.
        exc_tb: Трассировка стека, если исключение произошло, иначе None.

    """
    if exc_type:
        await self.rollback()
    else:
        await self._commit()
    await self.session.close()

__init__(session_factory: async_sessionmaker[AsyncSession], repo_factory: ABCUsersRepositoryFactory) -> None

Инициализация SqlAlchemyUnitOfWork.

Parameters:

Name Type Description Default
session_factory async_sessionmaker[AsyncSession]

Фабрика для создания асинхронных сессий SQLAlchemy.

required
repo_factory ABCUsersRepositoryFactory

Фадрика создания репозитория для работы с данными

required
Source code in src/service_layer/uow.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def __init__(
    self,
    session_factory: async_sessionmaker[AsyncSession],
    repo_factory: ABCUsersRepositoryFactory,
) -> None:
    """Инициализация SqlAlchemyUnitOfWork.

    Args:
        session_factory: Фабрика для создания асинхронных сессий SQLAlchemy.
        repo_factory: Фадрика создания репозитория для работы с данными

    """
    self.session_factory = session_factory
    self.repo_factory = repo_factory

rollback() -> None async

Выполняет откат текущей транзакции.

Отменяет все изменения, сделанные в рамках текущей сессии.

Source code in src/service_layer/uow.py
139
140
141
142
143
144
async def rollback(self) -> None:
    """Выполняет откат текущей транзакции.

    Отменяет все изменения, сделанные в рамках текущей сессии.
    """
    await self.session.rollback()

Примеры использования

Базовое использование

from sqlalchemy.ext.asyncio import async_sessionmaker
from src.service_layer.uow import SqlAlchemyUnitOfWork
from src.adapters.factory import SQLAlchemyUsersRepositoryFactory

# Инициализация фабрики сессий и репозиториев
session_factory = async_sessionmaker(engine, expire_on_commit=False)
repo_factory = SQLAlchemyUsersRepositoryFactory(hasher)

# Использование Unit of Work
async with SqlAlchemyUnitOfWork(session_factory, repo_factory) as uow:
    # Работа с пользователями через репозиторий
    user = await uow.users.get_by_email("user@example.com")
    user.update_last_login()

    # Фиксация изменений
    await uow.commit()

Обработка ошибок

from sqlalchemy import exc

try:
    async with SqlAlchemyUnitOfWork(session_factory, repo_factory) as uow:
        user = await uow.users.get_by_id(1)
        user.balance -= amount
        await uow.commit()
except exc.SQLAlchemyError as e:
    logger.error(f"Ошибка при обновлении баланса: {e}")
    raise

Рекомендации по использованию

  1. Управление транзакциями
  2. Всегда используйте commit() для сохранения изменений
  3. Обрабатывайте исключения и используйте rollback() при ошибках
  4. Избегайте вложенных блоков Unit of Work

  5. Работа с репозиториями

  6. Получайте доступ к репозиториям через атрибуты UoW
  7. Не сохраняйте ссылки на репозитории вне контекста UoW
  8. Используйте фабрики репозиториев для создания экземпляров

  9. Производительность

  10. Минимизируйте время удержания сессии
  11. Используйте expire_on_commit=False для предотвращения лишних запросов
  12. Избегайте N+1 запросов при загрузке связанных сущностей

Интеграция

Модуль интегрируется с:

  • SQLAlchemy ORM
  • Асинхронными сессиями SQLAlchemy
  • Репозиториями доступа к данным
  • Сервисным слоем приложения
  • Системой логирования

Ограничения

  • Требуется Python 3.8+
  • Зависит от SQLAlchemy с поддержкой асинхронности
  • Предполагает использование контекстных менеджеров

Дополнительные возможности

Кастомная реализация Unit of Work

from typing import Any
from contextlib import asynccontextmanager

class CustomUnitOfWork(AbstractUnitOfWork):
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.connection = None
        self.transaction = None
        self.users = CustomUsersRepository()

    async def __aenter__(self):
        self.connection = await create_connection(self.connection_string)
        self.transaction = self.connection.begin()
        return self

    async def _commit(self):
        await self.transaction.commit()

    async def rollback(self):
        if self.transaction:
            await self.transaction.rollback()

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            await self.rollback()
        if self.connection:
            await self.connection.close()

Использование с FastAPI

from fastapi import Depends
from sqlalchemy.ext.asyncio import async_sessionmaker
from src.service_layer.uow import SqlAlchemyUnitOfWork

async def get_uow() -> SqlAlchemyUnitOfWork:
    session_factory = async_sessionmaker(engine, expire_on_commit=False)
    repo_factory = SQLAlchemyUsersRepositoryFactory(hasher)
    return SqlAlchemyUnitOfWork(session_factory, repo_factory)

@router.post("/users/{user_id}/update")
async def update_user(
    user_id: int,
    data: UserUpdate,
    uow: SqlAlchemyUnitOfWork = Depends(get_uow)
):
    async with uow:
        user = await uow.users.get_by_id(user_id)
        user.update(**data.dict())
        await uow.commit()
        return {"status": "success"}