Перейти к содержанию

Движок базы данных

Модуль engine предоставляет функционал для работы с асинхронным подключением к базе данных с использованием SQLAlchemy.

Особенности

  • Ленивая инициализация движка БД
  • Управление пулом соединений
  • Обработка ошибок подключения
  • Кэширование фабрики сессий
  • Асинхронный API

Документация API

get_engine

Фабричная функция для создания и кэширования асинхронного движка SQLAlchemy.

Ленивая инициализация engine. Настройки должны быть загружены до вызова.

Returns:

Name Type Description
AsyncEngine AsyncEngine

Инициализированный асинхронный движок SQLAlchemy

Raises:

Type Description
DatabaseConnectionError

Если не удалось создать подключение к БД

Source code in src/infrastructure/database/engine.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@lru_cache
def get_engine() -> AsyncEngine:
    """Ленивая инициализация engine. Настройки должны быть загружены до вызова.

    Returns:
        AsyncEngine: Инициализированный асинхронный движок SQLAlchemy

    Raises:
        DatabaseConnectionError: Если не удалось создать подключение к БД

    """
    settings = get_settings()

    try:
        engine = create_async_engine(
            settings.postgres_uri,
            pool_size=5,
            max_overflow=10,
            pool_timeout=30,
            pool_pre_ping=True,
            pool_recycle=300,
            connect_args={'command_timeout': 10},
        )
        logger.info('Асинхронный движок БД успешно создан')
        return engine

    except sa_exceptions.TimeoutError as e:
        logger.exception('Таймаут подключения к БД')
        raise DatabaseTimeoutError(f'Таймаут подключения к БД: {str(e)}') from e
    except sa_exceptions.ArgumentError as e:
        logger.exception('Некорректные параметры подключения к БД:')
        raise DatabaseArgumentError(f'Некорректные параметры подключения к БД: {str(e)}') from e
    except sa_exceptions.SQLAlchemyError as e:
        logger.exception('Ошибка SQLAlchemy при создании движка')
        raise DatabaseConnectionError(f'Ошибка SQLAlchemy при создании движка: {str(e)}') from e
    except Exception as e:
        error_msg = f'Непредвиденная ошибка при создании движка БД: {str(e)}'
        logger.critical(error_msg, exc_info=True)
        raise DatabaseConnectionError(error_msg) from e

get_session_factory

Фабрика для создания асинхронных сессий с настройками по умолчанию.

Ленивая инициализация асинхронной session factory.

Source code in src/infrastructure/database/engine.py
63
64
65
66
67
68
69
70
@lru_cache
def get_session_factory() -> async_sessionmaker[AsyncSession]:
    """Ленивая инициализация асинхронной session factory."""
    engine = get_engine()
    return async_sessionmaker(
        bind=engine,
        expire_on_commit=False,
    )  # type: ignore

Примеры использования

Получение движка и создание сессии

from src.infrastructure.database.engine import get_engine, get_session_factory

# Получение движка (кэшируется при первом вызове)
engine = get_engine()

# Получение фабрики сессий (кэшируется при первом вызове)
session_factory = get_session_factory()

# Создание сессии
async with session_factory() as session:
    # Работа с базой данных
    result = await session.execute("SELECT 1")
    print(await result.scalar())

Рекомендации по использованию

  1. Инициализация
  2. Убедитесь, что настройки приложения загружены до первого вызова get_engine()
  3. Используйте get_engine() и get_session_factory() как фабричные функции
  4. Не создавайте несколько экземпляров движка в одном приложении

  5. Настройки подключения

  6. Настройте параметры пула соединений в соответствии с нагрузкой
  7. Используйте pool_pre_ping для обнаружения разорванных соединений
  8. Установите разумные таймауты для окружения

  9. Обработка ошибок

  10. Обрабатывайте специфические исключения SQLAlchemy
  11. Используйте контекстные менеджеры для управления сессиями
  12. Логируйте ошибки подключения

Интеграция

Модуль интегрируется с: - SQLAlchemy ORM - ASGI-приложениями - Системами кэширования (через lru_cache) - Системами логирования

Ограничения

  • Требуется SQLAlchemy 1.4+
  • Зависит от асинхронного драйвера БД (например, asyncpg)
  • Кэширование движка и фабрики сессий может быть нежелательным в тестовом окружении

Дополнительные возможности

Кастомные настройки движка

from sqlalchemy.ext.asyncio import create_async_engine

def create_custom_engine(url: str) -> AsyncEngine:
    return create_async_engine(
        url,
        pool_size=10,
        max_overflow=20,
        pool_timeout=30,
        pool_pre_ping=True,
        pool_recycle=3600,
        connect_args={'command_timeout': 15},
    )

Обработка ошибок подключения

from src.infrastructure.database.exceptions import (
    DatabaseConnectionError,
    DatabaseTimeoutError,
    DatabaseArgumentError
)

try:
    engine = get_engine()
except DatabaseTimeoutError as e:
    # Обработка таймаута подключения
    logger.error(f"Таймаут подключения к БД: {e}")
    raise