Migrations

Whether you're starting a new project or already have a table schema in place, you're eventually going to modify your TableBase schema and need that reflected in your database. Instead of writing this logic by handle, we bundle a series of utilities that handle this manipulation for you in a safe and predictable way.

You'll typically want to call your migration commands from your command line. So we can go ahead and import your CLI library of choice and write some wrapper code on top of our utility functions. We start by defining our project name (the same one that's declared in your pyproject.toml file) alongside a helper function to get a connection to your database.

For a real project, you'll want to grab these values from environment variables.

# cli.py
import asyncio
import asyncpg
from click import command, option

from iceaxe import DBConnection
from iceaxe.migrations.cli import handle_generate, handle_apply, handle_rollback

from myproject import models # noqa: F401

PROJECT = "myproject"

async def get_connection():
    return DBConnection(
        await asyncpg.connect(
            host="localhost",
            port=5432,
            user="db_user",
            password="yoursecretpassword",
            database="your_db",
        )
    )

Now we can write the CLI functions that will actually be called. Iceaxe takes care of the core logic, so our job here is just to expose it to the command line.

@command()
@option("--message", help="A message to attach to the migration")
def generate(message: str | None):
    async def _inner():
        await handle_generate(PROJECT, await get_connection(), message=message)
    asyncio.run(_inner())

@command()
def apply():
    async def _inner():
        await handle_apply(PROJECT, await get_connection())
    asyncio.run(_inner())

@command()
def rollback():
    async def _inner():
        await handle_rollback(PROJECT, await get_connection())
    asyncio.run(_inner())

Great, now just modify your pyproject.toml file to make these callable. If you're using uv, you can add the following to your project.scripts section:

[project.scripts]
migrate-generate = "myproject.cli:generate"
migrate-apply = "myproject.cli:apply"
migrate-rollback = "myproject.cli:rollback"

Generating migrations

Let's say we currently have a simple Employee table that's defined in our database. It has an auto-incrementing ID and a name.

Column NameData TypeConstraints
idintPRIMARY KEY
namevarchar

We want to add an age to each user record, so we modify our TableBase schema to include this new field.

from iceaxe import TableBase, Field

class Employee(TableBase):
    id: int | None = Field(primary_key=True, default=None)
    name: str
    age: int  # new field

When we call our migrate-generate from the CLI, Iceaxe will do the following:

  • Use the database connection to introspect the current schema
  • Compare the current schema to the new schema, creating a migration pathway to convert one to the other
  • Generate a migration file that will add the age column to the employee table
$ uv run migrate-generate

Generating migration to current schema
New migration added: rev_1729278706.py

It places the following file in your myprojects/migrations directory:

from iceaxe.migrations.migrator import Migrator
from iceaxe.migrations.migration import MigrationRevisionBase
from iceaxe.schemas.actions import ColumnType

class MigrationRevision(MigrationRevisionBase):
    """
    Migration auto-generated on 2024-10-18T12:11:46.941324.

    Context: None

    """
    up_revision: str = "1729278608"
    down_revision: str | None = None

    async def up(self, migrator: Migrator):
        await migrator.actor.add_column(table_name="employee", column_name="age", explicit_data_type=ColumnType.INTEGER, explicit_data_is_list=False, custom_data_type=None)
        await migrator.actor.add_not_null(table_name="employee", column_name="age")

    async def down(self, migrator: Migrator):
        await migrator.actor.drop_column(table_name="employee", column_name="age")

This file contains the logic to transform your current schema to the new schema (up) as well as to undo those changes and revert back to the original schema (down). This down logic lets you rollback changes that potentially break your application.

You can modify these migration functions to include additional logic, such as defaulting default values for the new column or migrating data from one column to another. For this up function we'd probably want to insert our known ages for our employees.

async def up(self, migrator: Migrator):
    await migrator.actor.add_column(table_name="employee", column_name="age", explicit_data_type=ColumnType.INTEGER, explicit_data_is_list=False, custom_data_type=None)

    # Insert known ages for employees
    await migrator.db_connection.exec(
        ...
    )

    await migrator.actor.add_not_null(table_name="employee", column_name="age")

Column Type Changes and Autocast

When you change the data type of an existing column in your TableBase schema, Iceaxe will automatically generate a modify_column_type migration. By default, these auto-generated migrations include automatic type casting (autocast=True) to handle data conversion between incompatible types.

Automatic Type Conversion

Consider this scenario where you want to change a text field to store numeric data:

# Before
class Product(TableBase):
    id: int = Field(primary_key=True)
    price: str  # Originally stored as text

# After  
class Product(TableBase):
    id: int = Field(primary_key=True)
    price: int  # Now we want it as an integer

Iceaxe will generate a migration like this:

async def up(self, migrator: Migrator):
    await migrator.actor.modify_column_type(
        table_name="product", 
        column_name="price", 
        explicit_data_type=ColumnType.INTEGER,
        autocast=True  # Automatically added
    )

The autocast=True parameter tells PostgreSQL to automatically convert existing data using an appropriate USING clause. This handles conversions like:

  • String to numeric types ("123"123)
  • String to boolean ("true"True)
  • String to date/time types ("2023-01-01"date(2023, 1, 1))
  • String to UUID, JSON, and other specialized types
  • Scalar to array types (42[42])
  • Custom enum conversions

Customizing Autocast Behavior

If you need to customize the type conversion behavior, you can manually edit the generated migration:

async def up(self, migrator: Migrator):
    # Disable autocast for manual control
    await migrator.actor.modify_column_type(
        table_name="product", 
        column_name="price", 
        explicit_data_type=ColumnType.INTEGER,
        autocast=False
    )
    
    # Or add custom data transformation logic
    await migrator.db_connection.conn.execute("""
        UPDATE product 
        SET price = CASE 
            WHEN price ~ '^[0-9]+$' THEN price::integer
            ELSE 0
        END
    """)

Supported Type Conversions

The autocast feature supports most common PostgreSQL type conversions:

From TypeTo TypeExample
VARCHAR/TEXTINTEGER/BIGINT"123"123
VARCHAR/TEXTBOOLEAN"true"true
VARCHAR/TEXTDATE/TIMESTAMP"2023-01-01"2023-01-01
VARCHAR/TEXTUUID"550e8400-..." → UUID
VARCHAR/TEXTJSON/JSONB'{"key":"value"}' → JSON
VARCHAR/TEXTCustom Enums"ACTIVE" → enum value
Scalar TypesArray Types42[42]
Compatible numeric typesDirect conversion123123.0

Migration Safety

Auto-generated migrations with autocast=True are designed to be safe for most common scenarios. However, you should always:

  1. Review generated migrations before applying them to production
  2. Test migrations on a copy of your production data first
  3. Backup your database before applying type-changing migrations
  4. Consider data validation - autocast will fail if data cannot be converted

If you have data that cannot be automatically converted (e.g., non-numeric strings when converting to integers), the migration will fail and you'll need to clean up the data first or provide custom conversion logic.

Applying migrations

When you're happy with your migration, you can apply it to your database by running the migrate-apply command. This migration will apply any unapplied migrations in your migrations folder up to the last revision.

$ uv run migrate-apply

🚀 Applied 1729278608 in 0.02s

Undoing migrations

If you need to rollback a migration, you can run the migrate-rollback command. This will undo 1 migration at a time, starting with the most recent migration.

$ uv run migrate-rollback

🪃 Rolled back migration to None in 0.01s

Advanced Migration Patterns

While basic migrations work great for most scenarios, production databases with high traffic volumes require more sophisticated approaches. Here's where things get really interesting.

When to Disable Transactions

By default, Iceaxe wraps each migration in a database transaction. This is usually what you want - if something goes wrong, everything gets rolled back cleanly. But sometimes transactions can actually work against you.

Here's when you might want to set use_transaction = False on your migration:

Hot Tables with Heavy Write Traffic: If you're migrating a table that gets hundreds of writes per second, you might run into deadlock situations. PostgreSQL's lock queue means that if your migration needs an ACCESS EXCLUSIVE lock and there's already a long-running query, every other query gets stuck waiting in line.

Operations That Can't Be Rolled Back: Some PostgreSQL operations can't be rolled back even within a transaction. Creating indexes concurrently is the classic example - Postgres will just throw an error if you try to do it inside a transaction block.

Very Long-Running Migrations: If your migration takes 30+ minutes to run, keeping a transaction open that long can cause problems for connection poolers and increase the chance of conflicts.

Here's how you disable transactions:

class MigrationRevision(MigrationRevisionBase):
    up_revision: str = "1729278608"
    down_revision: str | None = None
    
    use_transaction: bool = False  # No transaction wrapper
    
    async def up(self, migrator: Migrator):
        # This migration runs without a transaction
        await migrator.actor.create_index_concurrently(
            table_name="employee", 
            index_name="idx_employee_email",
            columns=["email"]
        )

⚠️ Warning: Disabling transactions means you lose rollback safety. If your migration fails halfway through, you'll need to manually clean up the partial changes. Only disable transactions when you're confident the migration will succeed or when the operations are naturally idempotent.

The NOT NULL Constraint Problem

Here's a scenario that bites a lot of people in production: You have a busy users table with millions of rows getting constant updates, and you want to add a new required field.

Don't do this on a hot table:

async def up(self, migrator: Migrator):
    # This will lock your entire table during validation!
    await migrator.actor.add_column(table_name="users", column_name="department", explicit_data_type=ColumnType.TEXT)
    await migrator.actor.add_not_null(table_name="users", column_name="department")

The problem: Adding NOT NULL requires PostgreSQL to scan the entire table to verify that no existing rows have NULL values. On a large table, this can take minutes and requires an ACCESS EXCLUSIVE lock that blocks all reads and writes.

Do this instead:

class MigrationRevision(MigrationRevisionBase):
    use_transaction: bool = False  # We'll handle this manually
    
    async def up(self, migrator: Migrator):
        # Step 1: Add the column as nullable (fast)
        await migrator.actor.add_column(
            table_name="users", 
            column_name="department", 
            explicit_data_type=ColumnType.TEXT
        )
        
        # Step 2: Add a CHECK constraint without validation (very fast)
        await migrator.raw_sql("""
            ALTER TABLE users 
            ADD CONSTRAINT users_department_not_null 
            CHECK (department IS NOT NULL) NOT VALID
        """)
        
        # Step 3: Backfill existing data (can be done in batches)
        await migrator.raw_sql("""
            UPDATE users 
            SET department = 'Engineering' 
            WHERE department IS NULL
        """)
        
        # Step 4: Validate the constraint (scans table but allows reads/writes)
        await migrator.raw_sql("""
            ALTER TABLE users VALIDATE CONSTRAINT users_department_not_null
        """)
        
        # Step 5 (optional): Convert to a real NOT NULL constraint 
        # This step is only supported in PostgreSQL 12+
        await migrator.raw_sql("""
            ALTER TABLE users ALTER COLUMN department SET NOT NULL
        """)
        
        # Step 6: Clean up the check constraint
        await migrator.raw_sql("""
            ALTER TABLE users DROP CONSTRAINT users_department_not_null
        """)

Why this works better:

  • Step 1 is instant - just adds metadata about the column
  • Step 2 takes an ACCESS EXCLUSIVE lock but only for milliseconds since no validation happens
  • Step 3 can be done in batches if needed, and you control the data being inserted
  • Step 4 scans the whole table but only needs a SHARE UPDATE EXCLUSIVE lock, so normal operations continue
  • Steps 5-6 are optional cleanup if you want a "real" NOT NULL constraint

Making Migrations Idempotent

The worst thing that can happen during a deployment is having a migration fail halfway through, leaving your database in an unknown state. Idempotent migrations are designed to be run multiple times safely.

Here are the key patterns:

Always check if changes already exist:

async def up(self, migrator: Migrator):
    # Check if column already exists before adding it
    result = await migrator.db_connection.conn.fetch("""
        SELECT column_name 
        FROM information_schema.columns 
        WHERE table_name = 'users' AND column_name = 'department'
    """)
    
    if not result:
        await migrator.actor.add_column(
            table_name="users", 
            column_name="department", 
            explicit_data_type=ColumnType.TEXT
        )

Use IF NOT EXISTS when possible:

async def up(self, migrator: Migrator):
    # PostgreSQL has built-in idempotency for many operations
    await migrator.raw_sql("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email 
        ON users (email)
    """)
    
    await migrator.raw_sql("""
        ALTER TABLE users 
        ADD CONSTRAINT IF NOT EXISTS users_email_unique 
        UNIQUE (email)
    """)

Handle data migrations carefully:

async def up(self, migrator: Migrator):
    # For data migrations, use UPDATE WHERE conditions
    # that won't affect already-migrated rows
    await migrator.raw_sql("""
        UPDATE users 
        SET status = 'active' 
        WHERE status IS NULL  -- Only update unmigrated rows
    """)
    
    # Or use INSERT ... ON CONFLICT for new data
    await migrator.raw_sql("""
        INSERT INTO user_preferences (user_id, theme) 
        SELECT id, 'dark' FROM users 
        WHERE id NOT IN (SELECT user_id FROM user_preferences)
    """)

Migration Deployment Strategies

For high-traffic applications, consider splitting complex migrations across multiple deployments:

Deploy 1: Add new columns, populate them

async def up(self, migrator: Migrator):
    await migrator.actor.add_column(table_name="orders", column_name="payment_method", explicit_data_type=ColumnType.TEXT)
    # Populate from existing data
    await migrator.raw_sql("UPDATE orders SET payment_method = 'credit_card' WHERE payment_method IS NULL")

Deploy 2: Add constraints once data is clean

async def up(self, migrator: Migrator):
    # Data is already populated, safe to add constraint
    await migrator.actor.add_not_null(table_name="orders", column_name="payment_method")

Deploy 3: Remove old columns after application is updated

async def up(self, migrator: Migrator):
    # Application code no longer references old_payment_field
    await migrator.actor.drop_column(table_name="orders", column_name="old_payment_field")

This "expand-migrate-contract" pattern keeps your application running throughout the entire process.

Monitoring Migration Performance

Long-running migrations can be scary in production. Here are some ways to keep tabs on what's happening:

async def up(self, migrator: Migrator):
    # Log progress for long operations
    await migrator.raw_sql("SELECT 'Starting user migration...'")
    
    start_time = time.time()
    
    # Batch large updates to avoid holding locks too long
    batch_size = 1000
    offset = 0
    
    while True:
        result = await migrator.db_connection.conn.execute("""
            UPDATE users 
            SET last_login = COALESCE(last_login, created_at)
            WHERE id >= $1 AND id < $2 AND last_login IS NULL
        """, offset, offset + batch_size)
        
        if result == "UPDATE 0":
            break
            
        offset += batch_size
        elapsed = time.time() - start_time
        print(f"Processed {offset} users in {elapsed:.2f}s")

You can also query PostgreSQL's pg_stat_progress_create_index view to monitor index creation progress, or pg_locks to see what locks your migration is holding.