Перейти к содержанию

Фабрики репозиториев

Модуль factory содержит классы-фабрики для создания и настройки репозиториев пользователей. Основная цель - инкапсулировать логику создания репозиториев и их зависимостей, что упрощает управление жизненным циклом объектов и внедрение зависимостей.

Документация API

ABCUsersRepositoryFactory

Bases: ABC

Абстрактная фабрика для создания репозитория пользователей.

Source code in src/adapters/factory.py
 9
10
11
12
13
14
15
class ABCUsersRepositoryFactory(abc.ABC):
    """Абстрактная фабрика для создания репозитория пользователей."""

    @abc.abstractmethod
    def create(self, session: AsyncSession) -> ABCUsersRepository:
        """Создаёт экземпляр репозитория пользователей."""
        raise NotImplementedError

create(session) abstractmethod

Создаёт экземпляр репозитория пользователей.

Source code in src/adapters/factory.py
12
13
14
15
@abc.abstractmethod
def create(self, session: AsyncSession) -> ABCUsersRepository:
    """Создаёт экземпляр репозитория пользователей."""
    raise NotImplementedError

SQLAlchemyUsersRepositoryFactory

Bases: ABCUsersRepositoryFactory

Source code in src/adapters/factory.py
18
19
20
21
22
23
class SQLAlchemyUsersRepositoryFactory(ABCUsersRepositoryFactory):
    def __init__(self, hasher: IPasswordHasher):
        self._hasher = hasher

    def create(self, session: AsyncSession) -> SQLAlchemyUsersRepository:
        return SQLAlchemyUsersRepository(session, self._hasher)

Пример использования

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from src.adapters.factory import SQLAlchemyUsersRepositoryFactory
from src.adapters.hasher import BcryptPasswordHasher

# Инициализация зависимостей
hasher = BcryptPasswordHasher()
engine = create_async_engine("postgresql+asyncpg://user:password@localhost/db")

# Создание экземпляра репозитория
async with AsyncSession(engine) as session:
    factory = SQLAlchemyUsersRepositoryFactory(hasher=hasher)
    repository = factory.create(session=session)

    # Использование репозитория
    user = await repository.get_user_by_email("user@example.com")