Migrations¶
This guide covers managing schema migrations for zae-limiter infrastructure.
Overview¶
Schema migrations handle changes to the DynamoDB table structure when upgrading between major versions of zae-limiter. The migration framework provides:
- Version tracking - Infrastructure version stored in DynamoDB
- Compatibility checking - Automatic client/infrastructure compatibility validation
- Migration registry - Ordered list of migrations between versions
- Rollback support - Optional rollback functions for reversible changes
When Migrations Are Needed¶
| Change Type | Migration Required? | Version Bump |
|---|---|---|
| Add optional attribute | No | Patch (x.x.1) |
| Add new index (GSI) | Yes | Minor (x.1.0) |
| Change key structure | Yes | Major (1.0.0) |
| Remove attribute | Yes | Major (1.0.0) |
| Change attribute type | Yes | Major (1.0.0) |
Version Compatibility¶
zae-limiter uses semantic versioning with specific compatibility rules.
Compatibility Rules¶
Client 1.x.x + Schema 1.x.x = Compatible
Client 2.x.x + Schema 1.x.x = Incompatible (migration required)
Client 1.2.0 + Min Client 1.3.0 = Incompatible (upgrade client)
Major version mismatch: Always incompatible. Schema migration required before the client can operate.
Minor/patch version mismatch: Compatible. Client and infrastructure can operate together.
Minimum client version: Infrastructure can require a minimum client version. Older clients are rejected.
Checking Compatibility¶
Use the CLI to check compatibility without modifying anything:
Output:
Compatibility Check
====================
Client: 1.2.0
Schema: 1.0.0
Lambda: 1.1.0
Result: COMPATIBLE (update available)
Lambda update available: 1.1.0 -> 1.2.0
Run 'zae-limiter upgrade' to update.
Viewing Version Information¶
Output:
zae-limiter Infrastructure Version
====================================
Client Version: 1.2.0
Schema Version: 1.0.0
Infra Schema: 1.0.0
Lambda Version: 1.1.0
Min Client Version: 0.0.0
Status: COMPATIBLE (Lambda update available)
Lambda update available: 1.1.0 -> 1.2.0
Run 'zae-limiter upgrade' to update Lambda.
Upgrading Infrastructure¶
For minor updates (Lambda code, no schema changes):
For major version upgrades requiring schema migration, see Sample Migration: v2.0.0.
Schema Modification Approaches¶
When modifying the DynamoDB schema, prefer solutions that preserve backward compatibility.
Non-Breaking Changes (Preferred)¶
These changes don't require migrations:
Adding optional attributes:
# Old code works - attribute simply missing
item = {"PK": "ENTITY#123", "SK": "#META", "name": "test"}
# New code adds optional attribute
item = {"PK": "ENTITY#123", "SK": "#META", "name": "test", "tags": ["prod"]}
Using if_not_exists() for new nested structures:
# Initialize nested map only if missing
update_expression = "SET #data.#metrics = if_not_exists(#data.#metrics, :empty_map)"
Adding conditional logic for missing attributes:
# Handle missing attribute gracefully
metrics = item.get("data", {}).get("metrics", {})
request_count = metrics.get("requests", 0)
Breaking Changes (Major Version)¶
These changes require migrations and major version bumps:
- Changing partition or sort key structure
- Removing required attributes
- Changing attribute data types
- Renaming attributes
- Restructuring nested data
DynamoDB-Specific Considerations¶
GSI Changes:
- Adding a GSI: Can be done without migration (CloudFormation update)
- Removing a GSI: Requires ensuring no code depends on it
- Changing GSI keys: Requires data migration
Key Pattern Changes:
# v1.0.0 pattern
PK = f"ENTITY#{entity_id}"
SK = f"#BUCKET#{resource}#{limit_name}"
# v2.0.0 pattern (breaking change!)
PK = f"ENT#{entity_id}" # Changed prefix
SK = f"BKT#{resource}#{limit_name}" # Changed prefix
Key pattern changes require migrating all existing data.
Creating a Migration¶
Migration File Structure¶
Create a new file in src/zae_limiter/migrations/:
# src/zae_limiter/migrations/v1_1_0.py
"""
Migration: v1.1.0 (Add metrics tracking)
This migration adds a metrics attribute to entity metadata
for tracking request statistics.
Changes:
- Add 'metrics' map to entity #META records
- Initialize with empty counters
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from . import Migration, register_migration
if TYPE_CHECKING:
from ..repository import Repository
async def migrate_v1_1_0(repository: Repository) -> None:
"""
Add metrics attribute to all entity metadata records.
This is a forward-only migration that initializes the
metrics structure for existing entities.
"""
# Query all entity metadata records
# Update each to add metrics if missing
# Use conditional updates to be idempotent
pass
async def rollback_v1_1_0(repository: Repository) -> None:
"""
Remove metrics attribute from entity metadata.
Note: This loses all collected metrics data.
"""
# Remove metrics attribute from all entities
pass
# Register the migration
register_migration(
Migration(
version="1.1.0",
description="Add metrics tracking to entities",
reversible=True,
migrate=migrate_v1_1_0,
rollback=rollback_v1_1_0,
)
)
Migration Protocol¶
Migration functions must follow this protocol:
Key requirements:
- Async: All migrations are async functions
- Repository parameter: Receives a configured Repository instance
- Idempotent: Safe to run multiple times (use conditional updates)
- Atomic where possible: Use transactions for related changes
Registering Migrations¶
Migrations are auto-registered when imported. Ensure your migration module is imported in migrations/__init__.py:
# src/zae_limiter/migrations/__init__.py
# ... existing code ...
# Import built-in migrations to register them
from . import v1_0_0 as _v1_0_0 # noqa: F401, E402
from . import v1_1_0 as _v1_1_0 # noqa: F401, E402 # Add new migration
Updating Schema Version¶
After adding a migration, update the current schema version:
# src/zae_limiter/version.py
# Current schema version - increment when schema changes
CURRENT_SCHEMA_VERSION = "1.1.0" # Updated from "1.0.0"
Validating Migrations¶
Unit Testing with Moto¶
Test migrations using moto for fast, isolated tests:
# tests/test_migrations.py
import pytest
from moto import mock_aws
from zae_limiter.migrations import get_migrations_between, apply_migrations
from zae_limiter.repository import Repository
@pytest.fixture
def mock_dynamodb():
with mock_aws():
# Create table and seed test data
yield
@pytest.mark.asyncio
async def test_migration_v1_1_0(mock_dynamodb):
"""Test v1.1.0 migration adds metrics to entities."""
repo = Repository("test_table", "us-east-1", None)
# Create test entity without metrics
await repo.create_entity(entity_id="test-1", name="Test Entity")
# Apply migration
applied = await apply_migrations(repo, "1.0.0", "1.1.0")
assert applied == ["1.1.0"]
# Verify metrics added - use raw DynamoDB query since Entity
# dataclass won't have the new 'metrics' field until code is updated
client = await repo._get_client()
response = await client.get_item(
TableName=repo.table_name,
Key={"PK": {"S": "ENTITY#test-1"}, "SK": {"S": "#META"}},
)
item = response["Item"]
assert "metrics" in item["data"]["M"]
assert item["data"]["M"]["metrics"]["M"]["requests"]["N"] == "0"
@pytest.mark.asyncio
async def test_migration_idempotent(mock_dynamodb):
"""Test migration can be safely run multiple times."""
repo = Repository("test_table", "us-east-1", None)
# Create test entity
await repo.create_entity(entity_id="test-1", name="Test Entity")
# Run migration twice
await apply_migrations(repo, "1.0.0", "1.1.0")
await apply_migrations(repo, "1.0.0", "1.1.0") # Should not fail
# Verify single application via raw DynamoDB query
client = await repo._get_client()
response = await client.get_item(
TableName=repo.table_name,
Key={"PK": {"S": "ENTITY#test-1"}, "SK": {"S": "#META"}},
)
item = response["Item"]
assert item["data"]["M"]["metrics"]["M"]["requests"]["N"] == "0"
Integration Testing with LocalStack¶
Test migrations against real AWS-compatible infrastructure:
# tests/test_migrations_integration.py
import os
import pytest
# Skip if LocalStack not available
pytestmark = pytest.mark.skipif(
not os.environ.get("AWS_ENDPOINT_URL"),
reason="LocalStack not available"
)
@pytest.mark.integration
@pytest.mark.asyncio
async def test_migration_with_localstack():
"""Test migration against LocalStack."""
endpoint_url = os.environ["AWS_ENDPOINT_URL"]
repo = Repository(
"test_migrations",
"us-east-1",
endpoint_url,
)
# Deploy infrastructure
# ... create table with CloudFormation ...
# Seed test data
# ... create entities ...
# Apply migration
applied = await apply_migrations(repo, "1.0.0", "1.1.0")
# Verify changes persisted
# ... assertions ...
Run integration tests:
# Start LocalStack (from project root)
docker compose up -d
# Set environment variables and run integration tests
export AWS_ENDPOINT_URL=http://localhost:4566
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
export AWS_DEFAULT_REGION=us-east-1
pytest -m integration -v
Production Validation Checklist¶
Before running migrations in production:
- [ ] Backup: Enable Point-in-Time Recovery (PITR) or create on-demand backup
- [ ] Test: Run migration against production data copy
- [ ] Monitor: Set up CloudWatch alarms for errors
- [ ] Rollback plan: Document rollback procedure
- [ ] Maintenance window: Schedule during low-traffic period
- [ ] Communication: Notify stakeholders of potential downtime
# Create on-demand backup before migration
aws dynamodb create-backup \
--table-name ZAEL-limiter \
--backup-name "pre-migration-$(date +%Y%m%d)"
# Verify PITR is enabled
aws dynamodb describe-continuous-backups \
--table-name ZAEL-limiter
Rollback Strategies¶
Reversible Migrations¶
For migrations that can be safely undone:
register_migration(
Migration(
version="1.1.0",
description="Add metrics tracking",
reversible=True, # Can be rolled back
migrate=migrate_v1_1_0,
rollback=rollback_v1_1_0, # Rollback function
)
)
Rollback removes or reverts the changes:
async def rollback_v1_1_0(repository: Repository) -> None:
"""Remove metrics attribute from all entities."""
# Implementation to remove metrics attribute
pass
Forward-Only Migrations¶
Some migrations cannot be reversed:
register_migration(
Migration(
version="2.0.0",
description="Restructure key patterns",
reversible=False, # Cannot be rolled back
migrate=migrate_v2_0_0,
rollback=None, # No rollback function
)
)
Forward-only migrations typically involve:
- Data transformation with information loss
- Key structure changes
- Removing deprecated attributes
Emergency Rollback Procedures¶
If a migration fails or causes issues:
1. Stop the bleeding:
2. Restore from backup (if needed):
# Restore from PITR
aws dynamodb restore-table-to-point-in-time \
--source-table-name ZAEL-limiter \
--target-table-name ZAEL-limiter-restored \
--restore-date-time "2024-01-15T10:00:00Z"
3. Run rollback (if reversible):
from zae_limiter.migrations import get_migrations
from zae_limiter.repository import Repository
async def emergency_rollback():
repo = Repository("ZAEL-limiter", "us-east-1", None)
migrations = get_migrations()
target_migration = next(m for m in migrations if m.version == "1.1.0")
if target_migration.reversible and target_migration.rollback:
await target_migration.rollback(repo)
print("Rollback complete")
else:
print("Migration is not reversible - restore from backup")
4. Update version record:
await repo.set_version_record(
schema_version="1.0.0", # Reverted version
lambda_version="1.0.0",
updated_by="emergency_rollback",
)
Sample Migration: v2.0.0¶
This example demonstrates a complete migration scenario for a hypothetical v2.0.0 release that adds a new Global Secondary Index for querying entities by creation date.
Scenario¶
Goal: Add ability to query entities by creation timestamp for audit purposes.
Changes:
1. Add created_at attribute to entity metadata
2. Add GSI3 for querying by creation date
3. Backfill created_at for existing entities
Migration Implementation¶
# src/zae_limiter/migrations/v2_0_0.py
"""
Migration: v2.0.0 (Add creation timestamp tracking)
This migration adds a created_at timestamp to all entities and
creates a new GSI for querying entities by creation date.
Schema changes:
- Add 'created_at' attribute to entity #META records
- Add GSI3: GSI3PK=CREATED#{YYYY-MM}, GSI3SK=ENTITY#{id}
Breaking changes:
- Requires CloudFormation stack update for GSI3
- All queries using GSI3 require v2.0.0+ client
Rollback:
- This migration is NOT reversible (GSI removal loses query capability)
- Restore from backup if rollback needed
"""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from . import Migration, register_migration
if TYPE_CHECKING:
from ..repository import Repository
async def migrate_v2_0_0(repository: Repository) -> None:
"""
Add created_at timestamp to all existing entities.
Note: GSI3 must be added via CloudFormation update before
running this migration. The migration only backfills data.
Steps:
1. Scan all entity metadata records
2. Add created_at if missing (set to migration timestamp)
3. Add GSI3 keys for index population
"""
migration_time = datetime.now(timezone.utc).isoformat()
migration_month = datetime.now(timezone.utc).strftime("%Y-%m")
# Get DynamoDB client
client = await repository._get_client()
# Scan for all entity metadata records
paginator = client.get_paginator('scan')
async for page in paginator.paginate(
TableName=repository.table_name,
FilterExpression="begins_with(SK, :meta)",
ExpressionAttributeValues={":meta": {"S": "#META"}},
):
items = page.get("Items", [])
# Process in batches of 25 (DynamoDB limit)
for i in range(0, len(items), 25):
batch = items[i:i + 25]
# Build batch update
update_requests = []
for item in batch:
pk = item["PK"]["S"]
sk = item["SK"]["S"]
# Use conditional update - only if created_at missing
update_requests.append({
"Update": {
"TableName": repository.table_name,
"Key": {"PK": {"S": pk}, "SK": {"S": sk}},
"UpdateExpression": (
"SET #created = if_not_exists(#created, :ts), "
"GSI3PK = if_not_exists(GSI3PK, :gsi3pk), "
"GSI3SK = if_not_exists(GSI3SK, :gsi3sk)"
),
"ExpressionAttributeNames": {
"#created": "created_at",
},
"ExpressionAttributeValues": {
":ts": {"S": migration_time},
":gsi3pk": {"S": f"CREATED#{migration_month}"},
":gsi3sk": {"S": pk},
},
}
})
# Execute batch (transactions limited to 100 items)
if update_requests:
await client.transact_write_items(
TransactItems=update_requests
)
# Rate limiting to avoid throttling
await asyncio.sleep(0.1)
# Update version record
await repository.set_version_record(
schema_version="2.0.0",
client_min_version="2.0.0", # Require v2.0.0+ clients
updated_by="migration:v2.0.0",
)
# Register the migration
register_migration(
Migration(
version="2.0.0",
description="Add creation timestamp tracking with GSI3",
reversible=False, # GSI changes are not easily reversible
migrate=migrate_v2_0_0,
rollback=None,
)
)
CloudFormation Update¶
The GSI must be added before running the migration:
# Addition to cfn_template.yaml
Resources:
RateLimitsTable:
Type: AWS::DynamoDB::Table
Properties:
# ... existing properties ...
GlobalSecondaryIndexes:
# ... existing GSIs ...
- IndexName: GSI3
KeySchema:
- AttributeName: GSI3PK
KeyType: HASH
- AttributeName: GSI3SK
KeyType: RANGE
Projection:
ProjectionType: ALL
AttributeDefinitions:
# ... existing attributes ...
- AttributeName: GSI3PK
AttributeType: S
- AttributeName: GSI3SK
AttributeType: S
Running the Migration¶
# 1. Create backup
aws dynamodb create-backup \
--table-name ZAEL-limiter \
--backup-name "pre-v2-migration-$(date +%Y%m%d)"
# 2. Update CloudFormation stack (adds GSI3)
aws cloudformation update-stack \
--stack-name ZAEL-limiter \
--template-body file://updated-template.yaml \
--capabilities CAPABILITY_NAMED_IAM
# 3. Wait for GSI to be active
aws dynamodb wait table-exists --table-name ZAEL-limiter
# 4. Install new client version
pip install zae-limiter==2.0.0
# 5. Run migration (via upgrade command or programmatically)
python -c "
import asyncio
from zae_limiter.migrations import apply_migrations
from zae_limiter.repository import Repository
async def run():
repo = Repository('ZAEL-limiter', 'us-east-1', None)
applied = await apply_migrations(repo, '1.0.0', '2.0.0')
print(f'Applied migrations: {applied}')
await repo.close()
asyncio.run(run())
"
# 6. Verify migration
zae-limiter version --name limiter --region us-east-1
Testing the Migration¶
@pytest.mark.asyncio
async def test_v2_migration_adds_created_at(mock_dynamodb):
"""Test v2.0.0 migration adds created_at to entities."""
repo = Repository("test_table", "us-east-1", None)
# Create entities without created_at (v1 schema)
await repo.create_entity(entity_id="entity-1", name="Test 1")
await repo.create_entity(entity_id="entity-2", name="Test 2")
# Verify no created_at before migration
entity = await repo.get_entity("entity-1")
assert entity.created_at is None
# Apply migration
applied = await apply_migrations(repo, "1.0.0", "2.0.0")
assert applied == ["2.0.0"]
# Verify created_at added - check raw DynamoDB for GSI3 keys
# (Entity dataclass has created_at, but GSI3PK/GSI3SK are schema-level)
client = await repo._get_client()
response = await client.get_item(
TableName=repo.table_name,
Key={"PK": {"S": "ENTITY#entity-1"}, "SK": {"S": "#META"}},
)
item = response["Item"]
assert "created_at" in item
assert item["created_at"]["S"].startswith("20") # Valid ISO timestamp
assert "GSI3PK" in item # GSI key was added
assert item["GSI3PK"]["S"].startswith("CREATED#")
@pytest.mark.asyncio
async def test_v2_migration_idempotent(mock_dynamodb):
"""Test v2.0.0 migration is idempotent."""
repo = Repository("test_table", "us-east-1", None)
# Create entity with created_at already set
# (simulating an entity created after v2.0.0 code deployed)
original_time = "2024-01-01T00:00:00Z"
client = await repo._get_client()
await client.put_item(
TableName=repo.table_name,
Item={
"PK": {"S": "ENTITY#entity-1"},
"SK": {"S": "#META"},
"data": {"M": {"name": {"S": "Test"}}},
"created_at": {"S": original_time},
"GSI3PK": {"S": "CREATED#2024-01"},
"GSI3SK": {"S": "ENTITY#entity-1"},
},
)
# Apply migration - should not overwrite existing values
await apply_migrations(repo, "1.0.0", "2.0.0")
# Verify original created_at preserved (if_not_exists)
response = await client.get_item(
TableName=repo.table_name,
Key={"PK": {"S": "ENTITY#entity-1"}, "SK": {"S": "#META"}},
)
item = response["Item"]
assert item["created_at"]["S"] == original_time
Reference¶
Version Record Structure¶
The version record is stored in DynamoDB:
| Attribute | Value | Description |
|---|---|---|
| PK | SYSTEM# |
Partition key |
| SK | #VERSION |
Sort key |
| schema_version | "1.0.0" |
Current schema version |
| lambda_version | "1.2.0" |
Deployed Lambda version |
| client_min_version | "1.0.0" |
Minimum client version |
| updated_at | ISO timestamp | Last update time |
| updated_by | "cli:1.2.0" |
What performed the update |
Key Patterns (v1.0.0)¶
| Pattern | Example | Description |
|---|---|---|
| Entity metadata | PK=ENTITY#123, SK=#META |
Entity configuration |
| Bucket state | PK=ENTITY#123, SK=#BUCKET#gpt-4#rpm |
Token bucket state |
| Limit config | PK=ENTITY#123, SK=#LIMIT#gpt-4#rpm |
Stored limit config |
| Usage snapshot | PK=ENTITY#123, SK=#USAGE#gpt-4#2024-01-15 |
Usage data |
| Version | PK=SYSTEM#, SK=#VERSION |
Infrastructure version |
Migration API Reference¶
from zae_limiter.migrations import (
Migration, # Migration dataclass
register_migration, # Register a migration
get_migrations, # Get all registered migrations
get_migrations_between, # Get migrations between versions
apply_migrations, # Apply migrations
)
from zae_limiter.version import (
CURRENT_SCHEMA_VERSION, # Current schema version constant
parse_version, # Parse version string
check_compatibility, # Check client/infra compatibility
get_schema_version, # Get current schema version
InfrastructureVersion, # Version info dataclass
CompatibilityResult, # Compatibility check result
)