Перейти к содержанию

Миграции базы данных

Модуль migrations предоставляет функционал для управления версионированием схемы базы данных с использованием Alembic.

Особенности

  • Поддержка асинхронных операций с базой данных
  • Интеграция с настройками приложения
  • Автоматическая генерация миграций
  • Поддержка отката изменений
  • Работа с Vault для получения секретов

Документация API

env.py

Основной скрипт конфигурации миграций Alembic.

get_sync_uri() -> str

Обертка для асинхронной функции, чтобы можно было вызвать в sync-коде.

Source code in src/infrastructure/database/migrations/env.py
26
27
28
def get_sync_uri() -> str:
    """Обертка для асинхронной функции, чтобы можно было вызвать в sync-коде."""
    return asyncio.get_event_loop().run_until_complete(init_settings_and_get_uri())

init_settings_and_get_uri() -> str async

Подгружает секреты из Vault и возвращает PostgreSQL URI.

Source code in src/infrastructure/database/migrations/env.py
17
18
19
20
21
22
23
async def init_settings_and_get_uri() -> str:
    """Подгружает секреты из Vault и возвращает PostgreSQL URI."""
    loader = SettingsLoader()
    await loader.load()
    settings = get_settings()
    # Alembic не поддерживает async драйвер, поэтому убираем "+asyncpg"
    return settings.postgres_uri.replace('+asyncpg', '')

init_settings_and_get_uri

Загружает настройки и возвращает URI для подключения к БД.

Подгружает секреты из Vault и возвращает PostgreSQL URI.

Source code in src/infrastructure/database/migrations/env.py
17
18
19
20
21
22
23
async def init_settings_and_get_uri() -> str:
    """Подгружает секреты из Vault и возвращает PostgreSQL URI."""
    loader = SettingsLoader()
    await loader.load()
    settings = get_settings()
    # Alembic не поддерживает async драйвер, поэтому убираем "+asyncpg"
    return settings.postgres_uri.replace('+asyncpg', '')

Примеры использования

Создание новой миграции

alembic revision --autogenerate -m "Описание изменений"

Применение миграций

# Применить все новые миграции
alembic upgrade head

# Откатить последнюю миграцию
alembic downgrade -1

# Применить конкретную миграцию
alembic upgrade <revision>

Проверка текущей версии

alembic current

Рекомендации по использованию

  1. Разработка
  2. Всегда добавляйте осмысленные сообщения к миграциям
  3. Проверяйте сгенерированные миграции перед применением
  4. Используйте --autogenerate с осторожностью
  5. Храните миграции в системе контроля версий

  6. Безопасность

  7. Не включайте чувствительные данные в миграции
  8. Используйте переменные окружения для конфиденциальной информации
  9. Проверяйте SQL-запросы перед выполнением в production

  10. Производительность

  11. Для больших таблиц используйте batch_alter_table
  12. Добавляйте индексы для часто используемых запросов
  13. Избегайте блокирующих операций в транзакциях

Интеграция

Модуль интегрируется с:

  • SQLAlchemy ORM
  • Alembic
  • Vault для управления секретами
  • Настройками приложения
  • Системами логирования

Ограничения

  • Требуется Alembic 1.8+
  • Зависит от синхронного драйвера БД для выполнения миграций
  • Автоматическая генерация может потребовать ручной доработки

Дополнительные возможности

Кастомные шаблоны миграций

Шаблоны миграций можно настраивать через script.py.mako:

"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}

# revision identifiers, used by Alembic
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}


def upgrade():
    ${upgrades if upgrades else "pass"}


def downgrade():
    ${downgrades if downgrades else "pass"}

Обработка ошибок

try:
    # Выполнение миграции
    context.run_migrations()
except Exception as e:
    logger.error(f"Ошибка при выполнении миграции: {e}")
    raise

Расширенные сценарии

Миграция с обработкой данных

def upgrade():
    # Создание новой таблицы
    op.create_table(
        'new_table',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('data', sa.String()),
        sa.PrimaryKeyConstraint('id')
    )

    # Копирование данных из старой таблицы
    connection = op.get_bind()
    connection.execute("""
        INSERT INTO new_table (id, data)
        SELECT id, old_data FROM old_table
    """)

    # Удаление старой таблицы
    op.drop_table('old_table')

    # Переименование новой таблицы
    op.rename_table('new_table', 'old_table')

Условное выполнение миграции

def upgrade():
    inspector = inspect(engine)
    if 'old_table_name' in inspector.get_table_names():
        # Выполнить миграцию только если таблица существует
        op.drop_table('old_table_name')