Skip to content

Repository

The Repository class owns all DynamoDB data access and infrastructure management. RateLimiter delegates data operations to the repository while owning business logic.

Preferred Usage Pattern

from zae_limiter import RateLimiter, Repository

# Open repository (auto-provisions if needed, recommended)
repo = await Repository.open()
limiter = RateLimiter(repository=repo)

# For explicit infrastructure provisioning, use builder:
repo = await Repository.builder().build()
limiter = RateLimiter(repository=repo)

See ADR-108 for the design rationale.

Repository

Repository

Repository(name, region=None, endpoint_url=None, stack_options=None, config_cache_ttl=60, *, _skip_deprecation_warning=False)

Async DynamoDB repository for rate limiter data.

Handles all DynamoDB operations including entities, buckets, limit configs, and transactions.

Use :meth:open to open a repository (recommended), or :meth:builder for custom infrastructure options (permission boundaries, IAM config).

.. deprecated:: Direct construction via Repository(...) is deprecated and will be removed in v1.0.0. Use Repository.open(...) or Repository.builder().build() instead.

Parameters:

Name Type Description Default
name str

Resource identifier (e.g., "my-app"). Used as the CloudFormation stack_name and DynamoDB table_name.

required
region str | None

AWS region (e.g., "us-east-1").

None
endpoint_url str | None

Custom endpoint URL (e.g., LocalStack).

None
stack_options StackOptions | None

Configuration for CloudFormation infrastructure. Pass StackOptions to enable declarative infrastructure management.

None
config_cache_ttl int

TTL in seconds for config cache (default: 60, 0 to disable). Controls caching of resolved limit configs in resolve_limits().

60

Example::

# Most users (auto-provisions if needed)
repo = await Repository.open("my-app")

# Custom infrastructure options
repo = await Repository.builder().namespace("my-app").build()

namespace_name property

namespace_name

The human-readable namespace name.

namespace_id property

namespace_id

The opaque namespace ID used in DynamoDB keys.

capabilities property

capabilities

Declare which extended features this backend supports.

builder classmethod

builder()

Create a RepositoryBuilder for fluent configuration.

For most use cases, prefer :meth:open instead. Use builder() when you need custom infrastructure options (permission boundaries, Lambda config, IAM role naming).

Stack defaults mirror :meth:open: ZAEL_STACK env var or "zae-limiter", namespace "default".

Example

repo = await ( Repository.builder() .namespace("my-app") .lambda_memory(512) .build() )

open async classmethod

open(namespace=None, *, stack=None, region=None, endpoint_url=None, config_cache_ttl=60, auto_update=True)

Open a repository, auto-provisioning infrastructure if needed.

This is the recommended entry point for most applications. Namespace is the primary parameter — stack name defaults to "zae-limiter" and is rarely needed.

Auto-provision behavior:

  • If the DynamoDB table doesn't exist, deploys a new stack with default options (aggregator enabled).
  • If the table exists but the namespace isn't registered, registers it.
  • The "default" namespace is always registered.
  • Version check and Lambda auto-update run on every call.

For custom infrastructure options (permission boundaries, Lambda config, IAM role naming), use Repository.builder() instead.

Stack resolution: stack arg → ZAEL_STACK env var → "zae-limiter".

Namespace resolution: namespace arg → ZAEL_NAMESPACE env var → "default".

Parameters:

Name Type Description Default
namespace str | None

Namespace to open. Defaults to ZAEL_NAMESPACE env var or "default".

None
stack str | None

Stack name. Defaults to ZAEL_STACK env var or "zae-limiter".

None
region str | None

AWS region (e.g., "us-east-1").

None
endpoint_url str | None

Custom endpoint URL (e.g., LocalStack).

None
config_cache_ttl int

Config cache TTL in seconds (default: 60, 0 to disable).

60
auto_update bool

Auto-update Lambda on version mismatch (default: True).

True

Returns:

Type Description
Repository

Fully initialized Repository ready for use.

Raises:

Type Description
IncompatibleSchemaError

If schema migration is required.

VersionMismatchError

If auto_update is False and versions differ.

Example::

# Most users
repo = await Repository.open("my-app")

# Multi-tenant
repo_alpha = await Repository.open("tenant-alpha")

# Explicit stack
repo = await Repository.open("my-app", stack="custom-stack")

# Simplest (stack="zae-limiter", namespace="default")
repo = await Repository.open()

namespace async

namespace(name, *, on_unavailable=None, bucket_ttl_multiplier=None)

Return a scoped Repository for the given namespace.

The scoped repo shares the DynamoDB client, entity cache, and namespace cache with the parent, but has its own ConfigCache and namespace identity. Calling close() on a scoped repo is a no-op (it does not close the shared client).

Parameters:

Name Type Description Default
name str

Namespace name to resolve.

required
on_unavailable OnUnavailableAction | None

Override on_unavailable behavior for this namespace ("allow" or "block"). Persisted via set_system_defaults().

None
bucket_ttl_multiplier int | None

Override bucket TTL multiplier for this scoped repo. Defaults to the parent's value.

None

Returns:

Type Description
Repository

A new Repository scoped to the resolved namespace.

Raises:

Type Description
NamespaceNotFoundError

If the namespace is not registered.

close async

close()

Close the DynamoDB client.

No-op for scoped repos (created via namespace()).

create_table async

create_table()

Create the DynamoDB table if it doesn't exist.

delete_table async

delete_table()

Delete the DynamoDB table.

delete_stack async

delete_stack()

Delete the CloudFormation stack and all associated resources.

Permanently removes the stack including DynamoDB table, Lambda aggregator, IAM roles, and CloudWatch log groups. Waits for deletion to complete. No-op if stack doesn't exist.

Raises:

Type Description
StackCreationError

If deletion fails.

ensure_infrastructure async

ensure_infrastructure()

Ensure DynamoDB infrastructure exists.

.. deprecated:: Use Repository.builder(...).build() instead, which handles infrastructure creation during the build step.

Creates CloudFormation stack using stack_options passed to the constructor. No-op if stack_options was not provided.

Raises:

Type Description
StackCreationError

If CloudFormation stack creation fails

create_stack async

create_stack(stack_options=None)

Create DynamoDB infrastructure via CloudFormation.

.. deprecated:: 0.6.0 Use :meth:ensure_infrastructure instead. Pass stack_options to the Repository constructor. Will be removed in v1.0.0.

Parameters:

Name Type Description Default
stack_options StackOptions | None

Configuration for CloudFormation stack. If None, uses the stack_options passed to the constructor.

None

Raises:

Type Description
StackCreationError

If CloudFormation stack creation fails

register_namespace async

register_namespace(namespace)

Register a namespace in the registry (idempotent).

Creates forward (name -> ID) and reverse (ID -> name) mappings. Idempotent: returns existing ID if namespace already registered.

Parameters:

Name Type Description Default
namespace str

Namespace name to register.

required

Returns:

Type Description
str

The namespace_id (either newly created or existing).

Raises:

Type Description
ValueError

If namespace is the reserved namespace "_".

register_namespaces async

register_namespaces(namespaces)

Bulk-register multiple namespaces.

Registers all namespaces in DynamoDB (forward + reverse records each).

Parameters:

Name Type Description Default
namespaces list[str]

List of namespace names to register.

required

Returns:

Type Description
dict[str, str]

Mapping of {name: namespace_id} for all namespaces.

Raises:

Type Description
ValueError

If any namespace is the reserved namespace "_".

get_namespace async

get_namespace(namespace)

Get details for a single namespace by name.

Parameters:

Name Type Description Default
namespace str

Namespace name to look up.

required

Returns:

Type Description
dict[str, str] | None

Dict with {name, namespace_id, status, created_at}

dict[str, str] | None

or None if not found.

list_namespaces async

list_namespaces()

List all active namespaces with their IDs.

Performs a Query on PK = "_/SYSTEM#" with SK begins_with "#NAMESPACE#" (forward records only).

Returns:

Type Description
list[dict[str, str]]

List of {name, namespace_id, created_at} dicts.

delete_namespace async

delete_namespace(namespace)

Soft-delete a namespace. O(1) for data plane.

Removes the forward record and marks the reverse record as status="deleted". Data items are NOT deleted — they remain orphaned under the namespace's random ID prefix.

No-op if the namespace does not exist.

Parameters:

Name Type Description Default
namespace str

Namespace name to delete.

required

Raises:

Type Description
ValueError

If namespace is the reserved namespace "_".

recover_namespace async

recover_namespace(namespace_id)

Recover a deleted namespace by its ID.

Reads the reverse record to find the original name, re-creates the forward record, and marks the reverse record as active.

Parameters:

Name Type Description Default
namespace_id str

Opaque namespace ID to recover.

required

Returns:

Type Description
str

The recovered namespace name.

Raises:

Type Description
EntityNotFoundError

If the reverse record does not exist.

ValueError

If the reverse record has status="purging" or status="active".

list_orphan_namespaces async

list_orphan_namespaces()

List deleted namespaces with orphaned data.

Queries reverse records (SK begins_with "#NSID#") and filters for status="deleted".

Returns:

Type Description
list[dict[str, str]]

List of {namespace_id, namespace, deleted_at} dicts.

purge_namespace async

purge_namespace(namespace_id)

Purge all orphaned data for a deleted namespace.

Verifies the namespace is in "deleted" status, transitions to "purging", queries GSI4 to find all items, deletes them in batches, then removes the reverse record.

Safe to call on a non-existent namespace_id (no-op).

Parameters:

Name Type Description Default
namespace_id str

Opaque namespace ID to purge.

required

Raises:

Type Description
ValueError

If the namespace is "active" (cannot purge an active namespace).

create_entity async

create_entity(entity_id, name=None, parent_id=None, cascade=False, metadata=None, principal=None)

Create a new entity.

Parameters:

Name Type Description Default
entity_id str

Unique identifier for the entity

required
name str | None

Optional display name (defaults to entity_id)

None
parent_id str | None

Optional parent entity ID (for hierarchical limits)

None
cascade bool

If True, acquire() will also consume from parent entity

False
metadata dict[str, str] | None

Optional key-value metadata

None
principal str | None

Caller identity for audit logging

None

Returns:

Type Description
Entity

The created Entity

Raises:

Type Description
InvalidIdentifierError

If entity_id or parent_id is invalid

EntityExistsError

If entity already exists

get_entity async

get_entity(entity_id)

Get an entity by ID.

delete_entity async

delete_entity(entity_id, principal=None)

Delete an entity and all its related records.

Parameters:

Name Type Description Default
entity_id str

ID of the entity to delete

required
principal str | None

Caller identity for audit logging

None

get_children async

get_children(parent_id)

Get all children of a parent entity.

get_bucket async

get_bucket(entity_id, resource, limit_name, shard_id=0)

Get a single limit's bucket from the composite item.

get_buckets async

get_buckets(entity_id, resource=None, shard_id=0)

Get all buckets for an entity, optionally filtered by resource.

With pre-shard buckets (v0.9.0+), each item lives on its own partition key PK={ns}/BUCKET#{id}#{resource}#{shard}. When resource is specified, fetches the single bucket at the given shard_id. When resource is None, uses GSI3 (KEYS_ONLY) to discover all bucket PKs, then BatchGetItem to fetch full items.

The internal wcu infrastructure limit is filtered from the returned bucket states.

Parameters:

Name Type Description Default
entity_id str

Entity to query buckets for.

required
resource str | None

Resource name filter, or None for all resources.

None
shard_id int

Shard index (used only when resource is specified).

0

Returns:

Type Description
list[BucketState]

List of BucketState objects (one per application limit).

batch_get_buckets async

batch_get_buckets(keys)

Batch get composite buckets in a single DynamoDB call.

With composite items, each (entity_id, resource, shard) is a single DynamoDB item containing all limits. Returns individual BucketStates keyed by (entity_id, resource, limit_name) for backward compatibility.

Parameters:

Name Type Description Default
keys list[tuple[str, str]]

List of (entity_id, resource) tuples. Uses shard_id=0.

required

Returns:

Type Description
dict[tuple[str, str, str], BucketState]

Dict mapping (entity_id, resource, limit_name) to BucketState.

dict[tuple[str, str, str], BucketState]

Missing composite items are not included in the result.

Note

DynamoDB BatchGetItem supports up to 100 items per request. For larger batches, this method automatically chunks the requests.

batch_get_entity_and_buckets async

batch_get_entity_and_buckets(entity_id, bucket_keys)

Fetch entity metadata and composite buckets in a single BatchGetItem.

With composite items, each (entity_id, resource) pair is a single DynamoDB item. Includes the entity's #META record alongside bucket records to avoid a separate get_entity() round trip.

Parameters:

Name Type Description Default
entity_id str

Entity whose META record to include

required
bucket_keys list[tuple[str, str]]

List of (entity_id, resource) for composite buckets

required

Returns:

Type Description
Entity | None

Tuple of (entity_or_none, bucket_dict) where bucket_dict maps

dict[tuple[str, str, str], BucketState]

(entity_id, resource, limit_name) to BucketState.

Note

DynamoDB BatchGetItem supports up to 100 items per request. The META key counts toward that limit.

batch_get_configs async

batch_get_configs(keys)

Batch get config items in a single DynamoDB call.

Fetches config records (entity, resource, system level) in a single BatchGetItem request and returns deserialized limits.

Parameters:

Name Type Description Default
keys list[tuple[str, str]]

List of (PK, SK) tuples identifying config items

required

Returns:

Type Description
dict[tuple[str, str], tuple[list[Limit], OnUnavailableAction | None]]

Dict mapping (PK, SK) to (limits, on_unavailable) tuples.

dict[tuple[str, str], tuple[list[Limit], OnUnavailableAction | None]]

on_unavailable is extracted from system config items (None for others).

dict[tuple[str, str], tuple[list[Limit], OnUnavailableAction | None]]

Missing items are not included in the result.

Note

DynamoDB BatchGetItem supports up to 100 items per request. For larger batches, this method automatically chunks the requests. Uses eventually consistent reads (0.5 RCU per item).

get_or_create_bucket async

get_or_create_bucket(entity_id, resource, limit)

Get an existing bucket or create a new one with the given limit.

If the bucket exists, it is returned. If not, a new bucket is created with capacity set to the limit's capacity.

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name (e.g., "gpt-4")

required
limit Limit

Limit configuration for the bucket

required

Returns:

Type Description
BucketState

Existing or newly created BucketState

build_bucket_put_item

build_bucket_put_item(state, ttl_seconds=86400)

Build a PutItem for a composite bucket (for use in transactions).

Wraps build_composite_create for backward compatibility with protocol.

build_bucket_update_item

build_bucket_update_item(entity_id, resource, limit_name, new_tokens_milli, new_last_refill_ms, expected_tokens_milli=None, shard_id=0)

Build an UpdateItem for a single limit in a composite bucket.

Legacy method — prefer build_composite_normal/retry/adjust for composite writes. This updates one limit's tk within the composite item.

build_composite_create

build_composite_create(entity_id, resource, states, now_ms, ttl_seconds=86400, cascade=False, parent_id=None, shard_id=0, shard_count=1)

Build a PutItem for creating a new composite bucket.

Used on first acquire for an entity+resource. Condition ensures no concurrent creation race (attribute_not_exists).

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name

required
states list[BucketState]

BucketState objects for each limit

required
now_ms int

Current timestamp in milliseconds

required
ttl_seconds int | None

TTL in seconds from now, or None to omit TTL

86400
cascade bool

Whether the entity has cascade enabled

False
parent_id str | None

The entity's parent_id (if any)

None
shard_id int

Shard index for this bucket (default 0)

0
shard_count int

Total number of shards (default 1)

1

build_composite_normal

build_composite_normal(entity_id, resource, consumed, refill_amounts, now_ms, expected_rf, ttl_seconds=None, shard_id=0)

Build an UpdateItem for the normal write path (ADR-115 path 2).

ADD tk:(refill - consumed), tc:consumed for each limit. SET rf:now. CONDITION rf = :expected.

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name

required
consumed dict[str, int]

Amount consumed per limit (millitokens)

required
refill_amounts dict[str, int]

Refill amount per limit (millitokens)

required
now_ms int

Current timestamp in milliseconds

required
expected_rf int

Expected refill timestamp for optimistic lock

required
ttl_seconds int | None

TTL behavior: - None: Don't change TTL - 0: REMOVE ttl (entity has custom limits) - >0: SET ttl to (now + ttl_seconds)

None
shard_id int

Shard index for this bucket (default 0)

0

build_composite_retry

build_composite_retry(entity_id, resource, consumed, shard_id=0)

Build an UpdateItem for the retry write path (ADR-115 path 3).

Lost optimistic lock — skip refill, only consume. ADD tk:(-consumed), tc:consumed for each limit. CONDITION: tk >= consumed per limit (prevent negative on acquire).

build_composite_adjust

build_composite_adjust(entity_id, resource, deltas, shard_id=0)

Build an UpdateItem for the adjust write path (ADR-115 path 4).

Unconditional ADD for post-hoc correction. Can go negative by design. Positive delta = consumed more (subtract tokens, add to counter). Negative delta = consumed less (add tokens, subtract from counter).

transact_write async

transact_write(items)

Execute a write, using single-item API when possible to halve WCU cost.

write_each async

write_each(items)

Write items independently without cross-item atomicity (1 WCU each).

Each item is dispatched as a single PutItem, UpdateItem, or DeleteItem call. Use for unconditional writes (e.g., ADD adjustments) where partial success is acceptable.

speculative_consume async

speculative_consume(entity_id, resource, consume, ttl_seconds=None, shard_id=None)

Attempt speculative UpdateItem with condition check.

Checks entity cache for cascade metadata. If cache hit + cascade, issues child+parent UpdateItems concurrently via asyncio.gather and returns nested parent_result.

When shard_id is explicitly provided, targets that shard directly without cascade logic (used for shard retry).

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name

required
consume dict[str, int]

Amount per limit (tokens, not milli)

required
ttl_seconds int | None

TTL in seconds from now, or None for no TTL change

None
shard_id int | None

Explicit shard to target (skips random selection and cascade logic). None means auto-select from entity cache.

None

Returns:

Type Description
SpeculativeResult

SpeculativeResult with:

SpeculativeResult
  • On cache hit + cascade + both succeed: parent_result populated
SpeculativeResult
  • On cache miss or non-cascade: parent_result is None
SpeculativeResult
  • On failure: old_buckets from ALL_OLD (or None if bucket missing)

bump_shard_count async

bump_shard_count(entity_id, resource, current_count)

Double shard_count on shard 0 via conditional write.

Shard 0 is the source of truth for shard_count. The conditional expression shard_count = :old prevents double-bumping when multiple clients race to double concurrently. Also updates the entity cache with the new shard_count.

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket.

required
resource str

Resource name.

required
current_count int

Current shard_count to double.

required

Returns:

Type Description
int

The new shard_count (doubled), or the current value if another

int

client already doubled (ConditionalCheckFailedException).

set_limits async

set_limits(entity_id, limits, resource=DEFAULT_RESOURCE, principal=None)

Store limit configs for an entity (composite format, ADR-114).

All limits for an entity+resource are stored in a single composite item with SK '#CONFIG#{resource}'. This reduces cache-miss cost from N GetItem calls to 1 GetItem call.

Parameters:

Name Type Description Default
entity_id str

ID of the entity

required
limits list[Limit]

List of Limit configurations to store

required
resource str

Resource name (defaults to "default")

DEFAULT_RESOURCE
principal str | None

Caller identity for audit logging

None

reconcile_bucket_to_defaults async

reconcile_bucket_to_defaults(entity_id, resource, effective_limits, stale_limit_names=None)

Reconcile bucket to effective defaults after config deletion (issue #327).

Updates limit fields (cp, ra, rp) to match the new effective limits, sets TTL (since entity is now on defaults), and removes stale limit attributes that no longer exist in the effective config.

No-op if bucket doesn't exist (uses attribute_exists(PK) condition).

Parameters:

Name Type Description Default
entity_id str

ID of the entity

required
resource str

Resource name

required
effective_limits list[Limit]

The new effective limits (resource/system defaults)

required
stale_limit_names set[str] | None

Limit names to REMOVE from bucket (limits that were in the deleted entity config but not in effective defaults)

None

get_limits async

get_limits(entity_id, resource=DEFAULT_RESOURCE)

Get stored limit configs for an entity (composite format, ADR-114).

Reads a single composite item with SK '#CONFIG#{resource}' containing all limits for the entity+resource pair.

delete_limits async

delete_limits(entity_id, resource=DEFAULT_RESOURCE, principal=None)

Delete stored limit configs for an entity (composite format, ADR-114).

Deletes the single composite config item for this entity+resource.

Parameters:

Name Type Description Default
entity_id str

ID of the entity

required
resource str

Resource name (defaults to "default")

DEFAULT_RESOURCE
principal str | None

Caller identity for audit logging

None

list_entities_with_custom_limits async

list_entities_with_custom_limits(resource, limit=None, cursor=None)

List all entities that have custom limit configurations for a resource.

Uses GSI3 sparse index for efficient queries. Only entity-level configs have GSI3 attributes, so this query returns only entities with custom limits (not system or resource defaults).

Parameters:

Name Type Description Default
resource str

Resource to filter by (required).

required
limit int | None

Maximum number of entities to return. None for all.

None
cursor str | None

Pagination cursor from previous call. None for first page.

None

Returns:

Type Description
tuple[list[str], str | None]

Tuple of (entity_ids, next_cursor). next_cursor is None if no more results.

list_resources_with_entity_configs async

list_resources_with_entity_configs()

List all resources that have entity-level custom limit configs.

Uses the entity config resources registry (wide column with ref counts) for efficient O(1) lookup. Returns resources with count > 0.

Returns:

Type Description
list[str]

Sorted list of resource names with at least one entity having custom limits

set_resource_defaults async

set_resource_defaults(resource, limits, principal=None)

Store default limit configs for a resource (composite format, ADR-114).

All limits for a resource are stored in a single composite item with SK '#CONFIG'. This reduces cache-miss cost.

Parameters:

Name Type Description Default
resource str

Resource name

required
limits list[Limit]

List of Limit configurations to store

required
principal str | None

Caller identity for audit logging

None

get_resource_defaults async

get_resource_defaults(resource)

Get stored default limit configs for a resource (composite format, ADR-114).

Reads a single composite item with SK '#CONFIG' containing all limits for the resource.

delete_resource_defaults async

delete_resource_defaults(resource, principal=None)

Delete stored default limit configs for a resource (composite format, ADR-114).

Deletes the single composite config item for this resource.

Parameters:

Name Type Description Default
resource str

Resource name

required
principal str | None

Caller identity for audit logging

None

list_resources_with_defaults async

list_resources_with_defaults()

List all resources that have default limit configs from the resource registry.

set_system_defaults async

set_system_defaults(limits, on_unavailable=None, principal=None)

Store system-wide default limits and config (composite format, ADR-114).

All system limits and config (on_unavailable) are stored in a single composite item with SK '#CONFIG'. This reduces cache-miss cost.

Parameters:

Name Type Description Default
limits list[Limit]

List of Limit configurations (apply to all resources)

required
on_unavailable OnUnavailableAction | None

Behavior when DynamoDB unavailable ("allow" or "block")

None
principal str | None

Caller identity for audit logging

None

get_system_defaults async

get_system_defaults()

Get system-wide default limits and config (composite format, ADR-114).

Reads a single composite item with SK '#CONFIG' containing all limits and on_unavailable setting.

Returns:

Type Description
tuple[list[Limit], OnUnavailableAction | None]

Tuple of (limits, on_unavailable). on_unavailable may be None if not set.

delete_system_defaults async

delete_system_defaults(principal=None)

Delete all system-wide default limits and config (composite format, ADR-114).

Deletes the single composite config item for system defaults.

Parameters:

Name Type Description Default
principal str | None

Caller identity for audit logging

None

get_system_limits async

get_system_limits()

Get system-wide default limits (without config).

This is a convenience method that returns only the limits. Use get_system_defaults() to also get on_unavailable config.

get_provisioner_state async

get_provisioner_state()

Get the provisioner state record for this namespace.

Returns:

Type Description
dict[str, Any]

Dict with keys: managed_system, managed_resources, managed_entities,

dict[str, Any]

last_applied, applied_hash. Returns empty state if no record exists.

put_provisioner_state async

put_provisioner_state(state)

Write the provisioner state record for this namespace.

Parameters:

Name Type Description Default
state dict[str, Any]

Dict with keys: managed_system, managed_resources, managed_entities, last_applied, applied_hash.

required

get_version_record async

get_version_record()

Get the infrastructure version record.

Returns:

Type Description
dict[str, Any] | None

Version record with schema_version, lambda_version, etc.

dict[str, Any] | None

None if no version record exists.

ping async

ping()

Check if the DynamoDB table is reachable.

Performs a lightweight GetItem operation to verify connectivity. Does not verify the table is initialized or has valid data.

Returns:

Type Description
bool

True if the table is reachable, False otherwise.

set_version_record async

set_version_record(schema_version, lambda_version=None, client_min_version='0.0.0', updated_by=None)

Set the infrastructure version record.

Parameters:

Name Type Description Default
schema_version str

Current schema version (e.g., "1.0.0")

required
lambda_version str | None

Currently deployed Lambda version

None
client_min_version str

Minimum compatible client version

'0.0.0'
updated_by str | None

Identifier of what performed the update

None

get_audit_events async

get_audit_events(entity_id, limit=100, start_event_id=None)

Get audit events for an entity.

Parameters:

Name Type Description Default
entity_id str

ID of the entity to query

required
limit int

Maximum number of events to return

100
start_event_id str | None

Event ID to start after (for pagination)

None

Returns:

Type Description
list[AuditEvent]

List of AuditEvent objects, ordered by most recent first

get_usage_snapshots async

get_usage_snapshots(entity_id=None, resource=None, window_type=None, start_time=None, end_time=None, limit=100, next_key=None)

Query usage snapshots with filtering and pagination.

Supports two query modes: 1. Entity-scoped: Query by entity_id (uses primary key) 2. Resource-scoped: Query by resource across all entities (uses GSI2)

Parameters:

Name Type Description Default
entity_id str | None

Entity to query (mutually exclusive for efficient queries)

None
resource str | None

Resource name filter (required if entity_id is None)

None
window_type str | None

Filter by window type ("hourly", "daily")

None
start_time str | None

Filter snapshots >= this timestamp (ISO format)

None
end_time str | None

Filter snapshots <= this timestamp (ISO format)

None
limit int

Maximum items to fetch from DynamoDB per page (default: 100)

100
next_key dict[str, Any] | None

Pagination cursor from previous call

None

Returns:

Type Description
tuple[list[UsageSnapshot], dict[str, Any] | None]

Tuple of (snapshots, next_key). next_key is None if no more results.

Raises:

Type Description
ValueError

If neither entity_id nor resource is provided

Note

The limit parameter controls the DynamoDB query batch size. Client-side filters (window_type, start_time, end_time) are applied after fetching, so the returned count may be less than limit. Use next_key to paginate through all matching results.

get_usage_summary async

get_usage_summary(entity_id=None, resource=None, window_type=None, start_time=None, end_time=None)

Aggregate usage across snapshots into a summary.

Fetches all matching snapshots (auto-paginates internally) and computes: - Total consumption per limit type - Average consumption per snapshot per limit type - Time range of aggregated data

Parameters:

Name Type Description Default
entity_id str | None

Entity to query

None
resource str | None

Resource name filter

None
window_type str | None

Filter by window type ("hourly", "daily")

None
start_time str | None

Filter snapshots >= this timestamp (ISO format)

None
end_time str | None

Filter snapshots <= this timestamp (ISO format)

None

Returns:

Type Description
UsageSummary

UsageSummary with aggregated statistics

Raises:

Type Description
ValueError

If neither entity_id nor resource is provided

get_resource_buckets async

get_resource_buckets(resource, limit_name=None)

Get all buckets for a resource across all entities.

With composite items, each GSI2 entry is one composite item per entity. Returns individual BucketStates, optionally filtered by limit_name.

resolve_limits async

resolve_limits(entity_id, resource)

Resolve effective limits using the four-level config hierarchy.

Uses ConfigCache with batched fetch optimization. Falls back to sequential individual GetItem calls if batch resolution fails.

Parameters:

Name Type Description Default
entity_id str

Entity to resolve limits for

required
resource str

Resource being accessed

required

Returns:

Type Description
tuple[list[Limit] | None, OnUnavailableAction | None, ConfigSource | None]

Tuple of (limits, on_unavailable, config_source)

resolve_on_unavailable async

resolve_on_unavailable()

Resolve on_unavailable from system config, with caching fallback.

Returns the on_unavailable action from system config. Caches the value after first successful load so it's available as fallback when DynamoDB is unreachable. Defaults to "block" if no system config exists and no cached value is available.

invalidate_config_cache async

invalidate_config_cache()

Invalidate all cached config entries (ADR-122).

get_cache_stats

get_cache_stats()

Get config cache performance statistics (ADR-122).

RepositoryProtocol

The RepositoryProtocol defines the interface for pluggable backends. Implement this protocol to use a different storage backend (e.g., for testing).

RepositoryProtocol

Bases: Protocol

Protocol for rate limiter data backends.

All storage backends (DynamoDB, Redis, SQLite, In-Memory) must implement this protocol to work with RateLimiter. The protocol is divided into:

  • Properties: Backend identification and configuration
  • Lifecycle: Connection management
  • Entity operations: CRUD for rate-limited entities
  • Bucket operations: Token bucket state management
  • Limit config: Stored limit configurations at entity/resource/system level
  • Version management: Schema and Lambda version tracking
  • Audit logging: Security audit trail
  • Usage snapshots: Historical consumption tracking
Example

Custom backend implementation

class MyBackend: @property def region(self) -> str | None: return "us-east-1"

async def get_entity(self, entity_id: str) -> Entity | None:
    ...

Duck typing - no inheritance needed

repo = MyBackend() assert isinstance(repo, RepositoryProtocol) # True at runtime

region property

region

AWS region (or None for local/in-memory backends).

endpoint_url property

endpoint_url

Custom endpoint URL (e.g., LocalStack, local DynamoDB).

stack_name property

stack_name

CloudFormation stack name.

table_name property

table_name

DynamoDB table name (same as stack_name).

namespace_name property

namespace_name

The human-readable namespace name.

namespace_id property

namespace_id

The opaque namespace ID used in DynamoDB keys.

capabilities property

capabilities

Declare which extended features this backend supports.

Returns:

Type Description
BackendCapabilities

BackendCapabilities instance with feature flags.

Example

if repo.capabilities.supports_audit_logging: events = await repo.get_audit_events(entity_id)

namespace async

namespace(name, *, on_unavailable=None, bucket_ttl_multiplier=None)

Return a scoped repository for the given namespace.

Parameters:

Name Type Description Default
name str

Namespace name to resolve.

required
on_unavailable OnUnavailableAction | None

Override on_unavailable for this namespace.

None
bucket_ttl_multiplier int | None

Override bucket TTL multiplier.

None

Returns:

Type Description
RepositoryProtocol

A repository scoped to the resolved namespace.

Raises:

Type Description
NamespaceNotFoundError

If the namespace is not registered.

register_namespace async

register_namespace(namespace)

Register a namespace (idempotent). Returns the namespace_id.

register_namespaces async

register_namespaces(namespaces)

Bulk-register namespaces. Returns {name: namespace_id}.

get_namespace async

get_namespace(namespace)

Get namespace details by name. Returns dict or None.

list_namespaces async

list_namespaces()

List active namespaces. Returns [{name, namespace_id, created_at}].

delete_namespace async

delete_namespace(namespace)

Soft-delete a namespace. No-op if not found.

recover_namespace async

recover_namespace(namespace_id)

Recover a deleted namespace. Returns the namespace name.

list_orphan_namespaces async

list_orphan_namespaces()

List deleted namespaces. Returns [{namespace_id, namespace, deleted_at}].

purge_namespace async

purge_namespace(namespace_id)

Purge all data for a deleted namespace. No-op if not found.

close async

close()

Close the backend connection and release resources.

Must be called when the repository is no longer needed. Safe to call multiple times.

ping async

ping()

Check if the backend is reachable.

Returns:

Type Description
bool

True if the backend is accessible, False otherwise.

ensure_infrastructure async

ensure_infrastructure()

Ensure backend infrastructure exists.

For DynamoDB: Creates CloudFormation stack if stack_options was provided to the constructor. For other backends: May create required resources or be a no-op.

Uses the options passed to the constructor. No-op if not provided.

create_entity async

create_entity(entity_id, name=None, parent_id=None, cascade=False, metadata=None, principal=None)

Create a new entity.

Parameters:

Name Type Description Default
entity_id str

Unique identifier for the entity

required
name str | None

Human-readable name (defaults to entity_id)

None
parent_id str | None

Parent entity ID for hierarchical limits

None
cascade bool

If True, acquire() will also consume from parent entity

False
metadata dict[str, str] | None

Additional key-value metadata

None
principal str | None

Caller identity for audit logging

None

Returns:

Type Description
Entity

The created Entity

Raises:

Type Description
EntityExistsError

If entity already exists

get_entity async

get_entity(entity_id)

Get an entity by ID.

Parameters:

Name Type Description Default
entity_id str

Entity identifier

required

Returns:

Type Description
Entity | None

Entity if found, None otherwise

delete_entity async

delete_entity(entity_id, principal=None)

Delete an entity and all related records.

Parameters:

Name Type Description Default
entity_id str

Entity to delete

required
principal str | None

Caller identity for audit logging

None

get_children async

get_children(parent_id)

Get all child entities of a parent.

Parameters:

Name Type Description Default
parent_id str

Parent entity ID

required

Returns:

Type Description
list[Entity]

List of child entities

get_bucket async

get_bucket(entity_id, resource, limit_name)

Get a token bucket by entity/resource/limit.

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name (e.g., "gpt-4")

required
limit_name str

Limit name (e.g., "tpm", "rpm")

required

Returns:

Type Description
BucketState | None

BucketState if found, None otherwise

get_buckets async

get_buckets(entity_id, resource=None, shard_id=0)

Get token buckets for an entity.

When resource is specified, fetches the bucket at the given shard_id. When resource is None, uses GSI3 to discover all buckets across resources and shards.

Parameters:

Name Type Description Default
entity_id str

Entity owning the buckets

required
resource str | None

Resource name filter (None for all resources)

None
shard_id int

Shard to fetch when resource is specified

0

Returns:

Type Description
list[BucketState]

List of bucket states

get_or_create_bucket async

get_or_create_bucket(entity_id, resource, limit)

Get an existing bucket or create a new one with the given limit.

This is the primary method for initializing token buckets. If a bucket exists, it is returned. If not, a new bucket is created with capacity set to the limit's capacity.

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name (e.g., "gpt-4")

required
limit Limit

Limit configuration for the bucket

required

Returns:

Type Description
BucketState

Existing or newly created BucketState

batch_get_buckets async

batch_get_buckets(keys)

Batch get composite buckets in a single call.

Parameters:

Name Type Description Default
keys list[tuple[str, str]]

List of (entity_id, resource) tuples

required

Returns:

Type Description
dict[tuple[str, str, str], BucketState]

Dict mapping (entity_id, resource, limit_name) to BucketState.

batch_get_entity_and_buckets async

batch_get_entity_and_buckets(entity_id, bucket_keys)

Fetch entity metadata and composite buckets in a single call.

Parameters:

Name Type Description Default
entity_id str

Entity whose metadata to include

required
bucket_keys list[tuple[str, str]]

List of (entity_id, resource) for composite buckets

required

Returns:

Type Description
tuple[Entity | None, dict[tuple[str, str, str], BucketState]]

Tuple of (entity_or_none, bucket_dict).

get_resource_buckets async

get_resource_buckets(resource, limit_name=None)

Get all buckets for a resource across all entities.

Used for capacity reporting and aggregation.

Parameters:

Name Type Description Default
resource str

Resource name

required
limit_name str | None

Optional filter by limit name

None

Returns:

Type Description
list[BucketState]

List of bucket states

build_bucket_put_item

build_bucket_put_item(state, ttl_seconds=86400)

Build a transaction item for upserting a bucket.

This is a synchronous method used to build transaction payloads.

Parameters:

Name Type Description Default
state BucketState

Bucket state to persist

required
ttl_seconds int

Time-to-live for the record

86400

Returns:

Type Description
dict[str, Any]

Transaction item dict for use with transact_write

build_composite_create

build_composite_create(entity_id, resource, states, now_ms, ttl_seconds=86400, cascade=False, parent_id=None)

Build a PutItem for creating a new composite bucket.

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name

required
states list[BucketState]

BucketState objects for each limit

required
now_ms int

Current timestamp in milliseconds

required
ttl_seconds int | None

TTL in seconds from now, or None to omit TTL

86400
cascade bool

Whether the entity has cascade enabled

False
parent_id str | None

The entity's parent_id (if any)

None

build_composite_normal

build_composite_normal(entity_id, resource, consumed, refill_amounts, now_ms, expected_rf, ttl_seconds=None)

Build an UpdateItem for the normal write path (ADR-115 path 2).

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name

required
consumed dict[str, int]

Amount consumed per limit (millitokens)

required
refill_amounts dict[str, int]

Refill amount per limit (millitokens)

required
now_ms int

Current timestamp in milliseconds

required
expected_rf int

Expected refill timestamp for optimistic lock

required
ttl_seconds int | None

TTL behavior (None=no change, 0=remove, >0=set)

None

build_composite_retry

build_composite_retry(entity_id, resource, consumed)

Build an UpdateItem for the retry write path (ADR-115 path 3).

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name

required
consumed dict[str, int]

Amount consumed per limit (millitokens)

required

build_composite_adjust

build_composite_adjust(entity_id, resource, deltas)

Build an UpdateItem for the adjust write path (ADR-115 path 4).

Unconditional ADD for post-hoc correction. Can go negative by design. Positive delta = consumed more (subtract tokens, add to counter). Negative delta = consumed less (add tokens, subtract from counter).

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name

required
deltas dict[str, int]

Delta per limit (millitokens, positive=consume, negative=release)

required

transact_write async

transact_write(items)

Execute a write of one or more items.

Single-item batches use the corresponding single-item API (PutItem, UpdateItem, or DeleteItem) to halve WCU cost. Multi-item batches use TransactWriteItems for atomicity.

Parameters:

Name Type Description Default
items list[dict[str, Any]]

List of transaction items from build_bucket_put_item

required

Raises:

Type Description
TransactionCanceledException

If multi-item transaction fails

ConditionalCheckFailedException

If single-item condition fails

write_each async

write_each(items)

Write items independently without cross-item atomicity.

Each item is dispatched as a single PutItem, UpdateItem, or DeleteItem call (1 WCU each). Use for unconditional writes (e.g., ADD adjustments) where partial success is acceptable.

Parameters:

Name Type Description Default
items list[dict[str, Any]]

List of items to write independently

required

speculative_consume async

speculative_consume(entity_id, resource, consume, ttl_seconds=None, shard_id=None)

Attempt speculative UpdateItem with condition check.

Issues an UpdateItem with ADD -consumed and condition attribute_exists(PK) AND tk >= consumed for each limit. Uses ReturnValuesOnConditionCheckFailure=ALL_OLD to return the current item state on failure.

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name

required
consume dict[str, int]

Amount per limit (tokens, not milli)

required
ttl_seconds int | None

TTL in seconds from now, or None for no TTL change

None
shard_id int | None

Explicit shard to target (skips random selection and cascade logic). None means auto-select from entity cache.

None

Returns:

Type Description
SpeculativeResult

SpeculativeResult with success flag and either:

SpeculativeResult
  • On success: buckets, cascade, parent_id from ALL_NEW
SpeculativeResult
  • On failure with ALL_OLD: old_buckets from ALL_OLD
SpeculativeResult
  • On failure without ALL_OLD: old_buckets is None (bucket missing)

bump_shard_count async

bump_shard_count(entity_id, resource, current_count)

Double shard_count on shard 0 via conditional write.

Also updates the entity cache with the new shard_count.

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name

required
current_count int

Expected current shard_count

required

Returns:

Type Description
int

The new shard_count, or current_count if another client already doubled.

reconcile_bucket_to_defaults async

reconcile_bucket_to_defaults(entity_id, resource, effective_limits, stale_limit_names=None)

Reconcile bucket to effective defaults after config deletion (issue #327).

Updates limit fields to match effective limits, sets TTL, and removes stale limit attributes. No-op if bucket doesn't exist.

Parameters:

Name Type Description Default
entity_id str

Entity owning the bucket

required
resource str

Resource name

required
effective_limits list[Limit]

New effective limits (resource/system defaults)

required
stale_limit_names set[str] | None

Limit names to REMOVE from bucket

None

set_limits async

set_limits(entity_id, limits, resource='_default_', principal=None)

Store limit configs for an entity.

Parameters:

Name Type Description Default
entity_id str

Entity to configure

required
limits list[Limit]

Limit configurations

required
resource str

Resource these limits apply to

'_default_'
principal str | None

Caller identity for audit logging

None

get_limits async

get_limits(entity_id, resource='_default_')

Get stored limit configs for an entity.

Parameters:

Name Type Description Default
entity_id str

Entity to query

required
resource str

Resource to get limits for

'_default_'

Returns:

Type Description
list[Limit]

List of limits (empty if none configured)

delete_limits async

delete_limits(entity_id, resource='_default_', principal=None)

Delete stored limit configs for an entity.

Parameters:

Name Type Description Default
entity_id str

Entity to delete limits for

required
resource str

Resource to delete limits for

'_default_'
principal str | None

Caller identity for audit logging

None

list_entities_with_custom_limits async

list_entities_with_custom_limits(resource, limit=None, cursor=None)

List all entities that have custom limit configurations for a resource.

Uses GSI3 sparse index for efficient queries.

Parameters:

Name Type Description Default
resource str

Resource to filter by (required).

required
limit int | None

Maximum number of entities to return. None for all.

None
cursor str | None

Pagination cursor from previous call. None for first page.

None

Returns:

Type Description
tuple[list[str], str | None]

Tuple of (entity_ids, next_cursor). next_cursor is None if no more results.

list_resources_with_entity_configs async

list_resources_with_entity_configs()

List all resources that have entity-level custom limit configurations.

Uses the entity config resources registry for efficient O(1) lookup.

Returns:

Type Description
list[str]

Sorted list of resource names with at least one entity having custom limits

set_resource_defaults async

set_resource_defaults(resource, limits, principal=None)

Store default limits for a resource.

Resource defaults apply to all entities accessing this resource unless overridden at the entity level.

Parameters:

Name Type Description Default
resource str

Resource name

required
limits list[Limit]

Default limits

required
principal str | None

Caller identity for audit logging

None

get_resource_defaults async

get_resource_defaults(resource)

Get default limits for a resource.

Parameters:

Name Type Description Default
resource str

Resource name

required

Returns:

Type Description
list[Limit]

List of limits (empty if none configured)

delete_resource_defaults async

delete_resource_defaults(resource, principal=None)

Delete default limits for a resource.

Parameters:

Name Type Description Default
resource str

Resource name

required
principal str | None

Caller identity for audit logging

None

list_resources_with_defaults async

list_resources_with_defaults()

List all resources that have default limits configured.

Returns:

Type Description
list[str]

List of resource names

set_system_defaults async

set_system_defaults(limits, on_unavailable=None, principal=None)

Store system-wide default limits and config.

System defaults apply to ALL resources unless overridden.

Parameters:

Name Type Description Default
limits list[Limit]

Global default limits

required
on_unavailable OnUnavailableAction | None

Behavior when backend unavailable ("allow" or "block")

None
principal str | None

Caller identity for audit logging

None

get_system_defaults async

get_system_defaults()

Get system-wide default limits and config.

Returns:

Type Description
tuple[list[Limit], OnUnavailableAction | None]

Tuple of (limits, on_unavailable). on_unavailable may be None.

delete_system_defaults async

delete_system_defaults(principal=None)

Delete all system-wide defaults.

Parameters:

Name Type Description Default
principal str | None

Caller identity for audit logging

None

get_version_record async

get_version_record()

Get the infrastructure version record.

Returns:

Type Description
dict[str, Any] | None

Version record with schema_version, lambda_version, etc.

dict[str, Any] | None

None if no version record exists.

set_version_record async

set_version_record(schema_version, lambda_version=None, client_min_version='0.0.0', updated_by=None)

Set the infrastructure version record.

Parameters:

Name Type Description Default
schema_version str

Current schema version (e.g., "1.0.0")

required
lambda_version str | None

Currently deployed Lambda version

None
client_min_version str

Minimum compatible client version

'0.0.0'
updated_by str | None

Identifier of what performed the update

None

get_audit_events async

get_audit_events(entity_id, limit=100, start_event_id=None)

Get audit events for an entity.

Parameters:

Name Type Description Default
entity_id str

Entity to query

required
limit int

Maximum events to return

100
start_event_id str | None

Pagination cursor (event ID to start after)

None

Returns:

Type Description
list[AuditEvent]

List of audit events, most recent first

get_usage_snapshots async

get_usage_snapshots(entity_id=None, resource=None, window_type=None, start_time=None, end_time=None, limit=100, next_key=None)

Query usage snapshots with filtering and pagination.

Parameters:

Name Type Description Default
entity_id str | None

Entity to query (uses primary key)

None
resource str | None

Resource filter (required if entity_id is None)

None
window_type str | None

Filter by "hourly" or "daily"

None
start_time str | None

Filter snapshots >= this timestamp (ISO format)

None
end_time str | None

Filter snapshots <= this timestamp (ISO format)

None
limit int

Maximum items per page

100
next_key dict[str, Any] | None

Pagination cursor from previous call

None

Returns:

Type Description
tuple[list[UsageSnapshot], dict[str, Any] | None]

Tuple of (snapshots, next_key). next_key is None if no more results.

Raises:

Type Description
ValueError

If neither entity_id nor resource is provided

get_usage_summary async

get_usage_summary(entity_id=None, resource=None, window_type=None, start_time=None, end_time=None)

Aggregate usage across snapshots into a summary.

Parameters:

Name Type Description Default
entity_id str | None

Entity to query

None
resource str | None

Resource filter

None
window_type str | None

Filter by "hourly" or "daily"

None
start_time str | None

Filter snapshots >= this timestamp

None
end_time str | None

Filter snapshots <= this timestamp

None

Returns:

Type Description
UsageSummary

UsageSummary with total, average, and time range

Raises:

Type Description
ValueError

If neither entity_id nor resource is provided

resolve_limits async

resolve_limits(entity_id, resource)

Resolve effective limits using the four-level config hierarchy.

Resolution order (first non-empty wins): 1. Entity-level config for this specific resource 2. Entity-level _default_ config (fallback for all resources) 3. Resource-level defaults 4. System-level defaults

Backend implementations should use native caching and resolution strategies. For DynamoDB, this uses ConfigCache with batched fetch optimization.

Parameters:

Name Type Description Default
entity_id str

Entity to resolve limits for

required
resource str

Resource being accessed

required

Returns:

Type Description
list[Limit] | None

Tuple of (limits, on_unavailable, config_source) where:

OnUnavailableAction | None
  • limits: Resolved limits or None if nothing found at any level
ConfigSource | None
  • on_unavailable: System-level unavailability behavior (if any)
tuple[list[Limit] | None, OnUnavailableAction | None, ConfigSource | None]
  • config_source: "entity", "entity_default", "resource", "system", or None if no config found

resolve_on_unavailable async

resolve_on_unavailable()

Resolve on_unavailable from system config, with caching fallback.

Returns the on_unavailable action. Caches the value after first successful load for use as fallback when DynamoDB is unreachable. Defaults to "block" if no system config and no cached value.

invalidate_config_cache async

invalidate_config_cache()

Invalidate all cached config entries.

Backends with caching should evict all entries. Backends without caching (e.g., Redis with native TTL) should no-op.

Call after modifying config to force immediate refresh.

get_cache_stats

get_cache_stats()

Get config cache performance statistics.

Returns statistics for monitoring cache behavior: - hits: Cache hits (avoided backend reads) - misses: Cache misses (backend reads performed) - size: Current entry count - ttl: TTL in seconds

Backends without caching should return zero-valued stats.

Returns:

Type Description
CacheStats

CacheStats object with cache metrics