Skip to content

RateLimiter

The main rate limiter classes for async and sync usage.

RateLimiter (Async)

RateLimiter

RateLimiter(repository=None, name=None, region=None, endpoint_url=None, stack_options=None, on_unavailable=_UNSET, auto_update=_UNSET, bucket_ttl_refill_multiplier=_UNSET, speculative_writes=True)

Async rate limiter backed by DynamoDB.

Implements token bucket algorithm with support for: - Multiple limits per entity/resource - Two-level hierarchy (parent/child entities) - Cascade mode (consume from entity + parent) - Stored limit configs - Usage analytics

Example (new API - preferred): from zae_limiter import RateLimiter, Repository, StackOptions

repo = Repository(
    name="my-app",
    region="us-east-1",
    stack_options=StackOptions(),
)
limiter = RateLimiter(repository=repo)

Example (old API - deprecated): limiter = RateLimiter( name="my-app", region="us-east-1", stack_options=StackOptions(), )

Parameters:

Name Type Description Default
repository RepositoryProtocol | None

Repository instance (new API, preferred). Pass a Repository or any RepositoryProtocol implementation.

None
name str | None

DEPRECATED. Use Repository(name=...) instead.

None
region str | None

DEPRECATED. Use Repository(region=...) instead.

None
endpoint_url str | None

DEPRECATED. Use Repository(endpoint_url=...) instead.

None
stack_options StackOptions | None

DEPRECATED. Use Repository(stack_options=...) instead.

None
on_unavailable OnUnavailable | Any

DEPRECATED. Use set_system_defaults(on_unavailable=...) or pass on_unavailable= to acquire() instead.

_UNSET
auto_update bool | Any

DEPRECATED. Use Repository.builder(...).auto_update().build() instead.

_UNSET
bucket_ttl_refill_multiplier int | Any

DEPRECATED. Use Repository.builder(...).bucket_ttl_multiplier().build() instead.

_UNSET
speculative_writes bool

Enable speculative UpdateItem fast path. When True, acquire() tries a speculative write first, falling back to the full read-write path only when needed.

True

Raises:

Type Description
ValueError

If both repository and name/region/endpoint_url/stack_options are provided.

name property

name

DEPRECATED. Use repository.stack_name instead.

stack_name property

stack_name

DEPRECATED. Use repository.stack_name instead.

table_name property

table_name

DEPRECATED. Use repository.stack_name instead.

list_deployed async classmethod

list_deployed(region=None, endpoint_url=None)

List all deployed rate limiter instances in a region.

This is a class method that discovers existing deployments without requiring an initialized RateLimiter instance. It queries CloudFormation for stacks tagged with ManagedBy=zae-limiter.

Parameters:

Name Type Description Default
region str | None

AWS region (default: use boto3 defaults)

None
endpoint_url str | None

CloudFormation endpoint (for LocalStack)

None

Returns:

Type Description
list[LimiterInfo]

List of LimiterInfo objects describing deployed instances.

list[LimiterInfo]

Sorted by user-friendly name. Excludes deleted stacks.

Example
Discover all limiters in us-east-1

limiters = await RateLimiter.list_deployed(region="us-east-1") for limiter in limiters: if limiter.is_healthy: print(f"✓ {limiter.user_name}: {limiter.version}") elif limiter.is_failed: print(f"✗ {limiter.user_name}: {limiter.stack_status}")

Raises:

Type Description
ClientError

If CloudFormation API call fails

close async

close()

Close the underlying connections.

is_available async

is_available(timeout=1.0)

Check if the rate limiter backend (DynamoDB) is reachable.

Performs a lightweight health check without requiring initialization. This method never raises exceptions - it returns False on any error.

Parameters:

Name Type Description Default
timeout float

Maximum time in seconds to wait for response (default: 1.0)

1.0

Returns:

Type Description
bool

True if DynamoDB table is reachable, False otherwise.

Example

limiter = RateLimiter(name="my-app", region="us-east-1") if await limiter.is_available(): async with limiter.acquire(...) as lease: ... else: # Handle degraded mode pass

create_entity async

create_entity(entity_id, name=None, parent_id=None, cascade=False, metadata=None, principal=None)

Create a new entity.

Parameters:

Name Type Description Default
entity_id str

Unique identifier for the entity

required
name str | None

Human-readable name (defaults to entity_id)

None
parent_id str | None

Parent entity ID (None for root/project entities)

None
cascade bool

If True, acquire() will also consume from parent entity

False
metadata dict[str, str] | None

Additional metadata to store

None
principal str | None

Caller identity for audit logging (optional)

None

Returns:

Type Description
Entity

The created Entity

Raises:

Type Description
EntityExistsError

If entity already exists

get_entity async

get_entity(entity_id)

Get an entity by ID.

delete_entity async

delete_entity(entity_id, principal=None)

Delete an entity and all its related data.

Parameters:

Name Type Description Default
entity_id str

ID of the entity to delete

required
principal str | None

Caller identity for audit logging (optional)

None

get_children async

get_children(parent_id)

Get all children of a parent entity.

get_audit_events async

get_audit_events(entity_id, limit=100, start_event_id=None)

Get audit events for an entity.

Retrieves security audit events logged for administrative operations on the specified entity, ordered by most recent first.

Parameters:

Name Type Description Default
entity_id str

ID of the entity to query

required
limit int

Maximum number of events to return (default: 100)

100
start_event_id str | None

Event ID to start after (for pagination)

None

Returns:

Type Description
list[AuditEvent]

List of AuditEvent objects, ordered by most recent first

Example

events = await limiter.get_audit_events("proj-1") for event in events: print(f"{event.timestamp}: {event.action} by {event.principal}")

get_usage_snapshots async

get_usage_snapshots(entity_id=None, resource=None, window_type=None, start_time=None, end_time=None, limit=100, next_key=None)

Query usage snapshots for historical consumption data.

Usage snapshots are created by the aggregator Lambda from DynamoDB stream events. They track token consumption per entity/resource within time windows (hourly, daily).

Supports two query modes: 1. Entity-scoped: Provide entity_id (optionally with resource filter) 2. Resource-scoped: Provide resource to query across all entities

Parameters:

Name Type Description Default
entity_id str | None

Entity to query (uses primary key)

None
resource str | None

Resource name filter (required if entity_id is None)

None
window_type str | None

Filter by window type ("hourly", "daily")

None
start_time datetime | None

Filter snapshots >= this timestamp

None
end_time datetime | None

Filter snapshots <= this timestamp

None
limit int

Maximum items to fetch from DynamoDB per page (default: 100)

100
next_key dict[str, Any] | None

Pagination cursor from previous call

None

Returns:

Type Description
tuple[list[UsageSnapshot], dict[str, Any] | None]

Tuple of (snapshots, next_key). next_key is None if no more results.

Raises:

Type Description
ValueError

If neither entity_id nor resource is provided

Note

The limit parameter controls the DynamoDB query batch size. Client-side filters (window_type, start_time, end_time) are applied after fetching, so the returned count may be less than limit. Use next_key to paginate through all matching results.

Example
Get hourly snapshots for an entity

snapshots, cursor = await limiter.get_usage_snapshots( entity_id="user-123", resource="gpt-4", window_type="hourly", start_time=datetime(2024, 1, 1), end_time=datetime(2024, 1, 31), ) for snap in snapshots: print(f"{snap.window_start}: {snap.counters}")

Paginate through results

while cursor: more, cursor = await limiter.get_usage_snapshots( entity_id="user-123", next_key=cursor, )

get_usage_summary async

get_usage_summary(entity_id=None, resource=None, window_type=None, start_time=None, end_time=None)

Get aggregated usage summary across multiple snapshots.

Fetches all matching snapshots and computes total and average consumption statistics. Useful for billing, reporting, and capacity planning.

Parameters:

Name Type Description Default
entity_id str | None

Entity to query

None
resource str | None

Resource name filter (required if entity_id is None)

None
window_type str | None

Filter by window type ("hourly", "daily")

None
start_time datetime | None

Filter snapshots >= this timestamp

None
end_time datetime | None

Filter snapshots <= this timestamp

None

Returns:

Type Description
UsageSummary

UsageSummary with total and average consumption per limit type

Raises:

Type Description
ValueError

If neither entity_id nor resource is provided

Example

summary = await limiter.get_usage_summary( entity_id="user-123", resource="gpt-4", window_type="hourly", start_time=datetime(2024, 1, 1), end_time=datetime(2024, 1, 31), ) print(f"Total tokens: {summary.total.get('tpm', 0)}") print(f"Average per hour: {summary.average.get('tpm', 0.0):.1f}") print(f"Snapshots: {summary.snapshot_count}")

acquire async

acquire(entity_id, resource, consume, limits=None, use_stored_limits=False, on_unavailable=None)

Acquire rate limit capacity.

Limits are resolved automatically from stored config using four-tier hierarchy: Entity > Entity Default > Resource > System. Pass limits to override.

Cascade behavior is controlled by the entity's cascade flag, set at entity creation time via create_entity(cascade=True). When enabled, acquire() automatically consumes from both the entity and its parent.

Parameters:

Name Type Description Default
entity_id str

Entity to acquire capacity for

required
resource str

Resource being accessed (e.g., "gpt-4")

required
consume dict[str, int]

Amounts to consume by limit name

required
limits list[Limit] | None

Override stored config with explicit limits (optional)

None
use_stored_limits bool

DEPRECATED - limits are now always resolved from stored config. This parameter will be removed in v1.0.

False
on_unavailable OnUnavailable | None

Override default on_unavailable behavior

None

Yields:

Type Description
AsyncIterator[Lease]

Lease for managing additional consumption

Raises:

Type Description
RateLimitExceeded

If any limit would be exceeded

RateLimiterUnavailable

If DynamoDB unavailable and BLOCK

ValidationError

If no limits configured at any level

available async

available(entity_id, resource, limits=None, use_stored_limits=False)

Check available capacity without consuming.

Limits are resolved using four-tier hierarchy: Entity > Entity Default > Resource > System. If no stored limits found, falls back to the limits parameter.

Returns minimum available across entity (and parent if cascade). Can return negative values if bucket is in debt.

Parameters:

Name Type Description Default
entity_id str

Entity to check

required
resource str

Resource to check

required
limits list[Limit] | None

Override limits (optional, falls back to stored config)

None
use_stored_limits bool

DEPRECATED - limits are now always resolved from stored config. This parameter will be removed in v1.0.

False

Returns:

Type Description
dict[str, int]

Dict mapping limit_name -> available tokens

Raises:

Type Description
ValidationError

If no limits found at any level and no override provided

time_until_available async

time_until_available(entity_id, resource, needed, limits=None, use_stored_limits=False)

Calculate seconds until requested capacity is available.

Limits are resolved using four-tier hierarchy: Entity > Entity Default > Resource > System. If no stored limits found, falls back to the limits parameter.

Parameters:

Name Type Description Default
entity_id str

Entity to check

required
resource str

Resource to check

required
needed dict[str, int]

Required amounts by limit name

required
limits list[Limit] | None

Override limits (optional, falls back to stored config)

None
use_stored_limits bool

DEPRECATED - limits are now always resolved from stored config. This parameter will be removed in v1.0.

False

Returns:

Type Description
float

Seconds until available (0.0 if already available)

Raises:

Type Description
ValidationError

If no limits found at any level and no override provided

set_limits async

set_limits(entity_id, limits, resource=DEFAULT_RESOURCE, principal=None)

Store limit configs for an entity.

Parameters:

Name Type Description Default
entity_id str

Entity to set limits for

required
limits list[Limit]

Limits to store

required
resource str

Resource these limits apply to (or default)

DEFAULT_RESOURCE
principal str | None

Caller identity for audit logging (optional)

None

get_limits async

get_limits(entity_id, resource=DEFAULT_RESOURCE)

Get stored limit configs for an entity.

Parameters:

Name Type Description Default
entity_id str

Entity to get limits for

required
resource str

Resource to get limits for

DEFAULT_RESOURCE

Returns:

Type Description
list[Limit]

List of stored limits (empty if none)

delete_limits async

delete_limits(entity_id, resource=DEFAULT_RESOURCE, principal=None)

Delete stored limit configs for an entity.

Reconciles existing buckets to fallback config (resource/system defaults) by syncing limit fields, setting TTL, and removing stale limit attributes (issue #327).

Parameters:

Name Type Description Default
entity_id str

Entity to delete limits for

required
resource str

Resource to delete limits for

DEFAULT_RESOURCE
principal str | None

Caller identity for audit logging (optional)

None

list_entities_with_custom_limits async

list_entities_with_custom_limits(resource, limit=None, cursor=None)

List all entities that have custom limit configurations.

Uses GSI3 sparse index for efficient queries. Only entities with custom limits for the specified resource are returned.

Parameters:

Name Type Description Default
resource str

Resource to filter by.

required
limit int | None

Maximum number of entities to return. None for all.

None
cursor str | None

Pagination cursor from previous call.

None

Returns:

Type Description
tuple[list[str], str | None]

Tuple of (entity_ids, next_cursor). next_cursor is None if no more results.

Example
Get all entities with custom limits for gpt-4

entities, cursor = await limiter.list_entities_with_custom_limits("gpt-4") for entity_id in entities: print(entity_id)

Paginate through results

while cursor: more, cursor = await limiter.list_entities_with_custom_limits( "gpt-4", cursor=cursor ) entities.extend(more)

list_resources_with_entity_configs async

list_resources_with_entity_configs()

List all resources that have entity-level custom limit configurations.

Uses the entity config resources registry for efficient O(1) lookup.

Returns:

Type Description
list[str]

Sorted list of resource names with at least one entity having custom limits

Example

resources = await limiter.list_resources_with_entity_configs() for resource in resources: entities, _ = await limiter.list_entities_with_custom_limits(resource) print(f"{resource}: {len(entities)} entities with custom limits")

set_resource_defaults async

set_resource_defaults(resource, limits, principal=None)

Store default limit configs for a resource.

Resource defaults override system defaults for the specified resource.

Parameters:

Name Type Description Default
resource str

Resource name

required
limits list[Limit]

Limits to store

required
principal str | None

Caller identity for audit logging (optional)

None

get_resource_defaults async

get_resource_defaults(resource)

Get stored default limit configs for a resource.

Parameters:

Name Type Description Default
resource str

Resource name

required

Returns:

Type Description
list[Limit]

List of stored limits (empty if none)

delete_resource_defaults async

delete_resource_defaults(resource, principal=None)

Delete stored default limit configs for a resource.

Parameters:

Name Type Description Default
resource str

Resource name

required
principal str | None

Caller identity for audit logging (optional)

None

list_resources_with_defaults async

list_resources_with_defaults()

List all resources that have default limit configs.

set_system_defaults async

set_system_defaults(limits, on_unavailable=None, principal=None)

Store system-wide default limits and config.

System defaults apply to ALL resources unless overridden at resource or entity level.

Parameters:

Name Type Description Default
limits list[Limit]

Limits to store (apply globally to all resources)

required
on_unavailable OnUnavailable | None

Behavior when DynamoDB unavailable (optional)

None
principal str | None

Caller identity for audit logging (optional)

None

get_system_defaults async

get_system_defaults()

Get system-wide default limits and config.

Returns:

Type Description
tuple[list[Limit], OnUnavailable | None]

Tuple of (limits, on_unavailable). on_unavailable may be None if not set.

delete_system_defaults async

delete_system_defaults(principal=None)

Delete all system-wide default limits and config.

Parameters:

Name Type Description Default
principal str | None

Caller identity for audit logging (optional)

None

get_resource_capacity async

get_resource_capacity(resource, limit_name, parents_only=False)

Get aggregated capacity for a resource across all entities.

Parameters:

Name Type Description Default
resource str

Resource to query

required
limit_name str

Limit name to query

required
parents_only bool

If True, only include parent entities

False

Returns:

Type Description
ResourceCapacity

ResourceCapacity with aggregated data

SyncRateLimiter

SyncRateLimiter

SyncRateLimiter(repository=None, name=None, region=None, endpoint_url=None, stack_options=None, on_unavailable=_UNSET, auto_update=_UNSET, bucket_ttl_refill_multiplier=_UNSET, speculative_writes=True)

Async rate limiter backed by DynamoDB.

Implements token bucket algorithm with support for: - Multiple limits per entity/resource - Two-level hierarchy (parent/child entities) - Cascade mode (consume from entity + parent) - Stored limit configs - Usage analytics

Example (new API - preferred): from zae_limiter import SyncRateLimiter, SyncRepository, StackOptions

repo = SyncRepository(
    name="my-app",
    region="us-east-1",
    stack_options=StackOptions(),
)
limiter = SyncRateLimiter(repository=repo)

Example (old API - deprecated): limiter = SyncRateLimiter( name="my-app", region="us-east-1", stack_options=StackOptions(), )

Parameters:

Name Type Description Default
repository SyncRepositoryProtocol | None

SyncRepository instance (new API, preferred). Pass a SyncRepository or any SyncRepositoryProtocol implementation.

None
name str | None

DEPRECATED. Use SyncRepository(name=...) instead.

None
region str | None

DEPRECATED. Use SyncRepository(region=...) instead.

None
endpoint_url str | None

DEPRECATED. Use SyncRepository(endpoint_url=...) instead.

None
stack_options StackOptions | None

DEPRECATED. Use SyncRepository(stack_options=...) instead.

None
on_unavailable OnUnavailable | Any

DEPRECATED. Use set_system_defaults(on_unavailable=...) or pass on_unavailable= to acquire() instead.

_UNSET
auto_update bool | Any

DEPRECATED. Use SyncRepository.builder(...).auto_update().build() instead.

_UNSET
bucket_ttl_refill_multiplier int | Any

DEPRECATED. Use SyncRepository.builder(...).bucket_ttl_multiplier().build() instead.

_UNSET
speculative_writes bool

Enable speculative UpdateItem fast path. When True, acquire() tries a speculative write first, falling back to the full read-write path only when needed.

True

Raises:

Type Description
ValueError

If both repository and name/region/endpoint_url/stack_options are provided.

name property

name

DEPRECATED. Use repository.stack_name instead.

stack_name property

stack_name

DEPRECATED. Use repository.stack_name instead.

table_name property

table_name

DEPRECATED. Use repository.stack_name instead.

list_deployed classmethod

list_deployed(region=None, endpoint_url=None)

List all deployed rate limiter instances in a region.

This is a class method that discovers existing deployments without requiring an initialized SyncRateLimiter instance. It queries CloudFormation for stacks tagged with ManagedBy=zae-limiter.

Parameters:

Name Type Description Default
region str | None

AWS region (default: use boto3 defaults)

None
endpoint_url str | None

CloudFormation endpoint (for LocalStack)

None

Returns:

Type Description
list[LimiterInfo]

List of LimiterInfo objects describing deployed instances.

list[LimiterInfo]

Sorted by user-friendly name. Excludes deleted stacks.

Example
Discover all limiters in us-east-1

limiters = SyncRateLimiter.list_deployed(region="us-east-1") for limiter in limiters: if limiter.is_healthy: print(f"✓ {limiter.user_name}: {limiter.version}") elif limiter.is_failed: print(f"✗ {limiter.user_name}: {limiter.stack_status}")

Raises:

Type Description
ClientError

If CloudFormation API call fails

close

close()

Close the underlying connections.

is_available

is_available(timeout=1.0)

Check if the rate limiter backend (DynamoDB) is reachable.

Performs a lightweight health check without requiring initialization. This method never raises exceptions - it returns False on any error.

Parameters:

Name Type Description Default
timeout float

Maximum time in seconds to wait for response (default: 1.0)

1.0

Returns:

Type Description
bool

True if DynamoDB table is reachable, False otherwise.

Example

limiter = SyncRateLimiter(name="my-app", region="us-east-1") if limiter.is_available(): async with limiter.acquire(...) as lease: ... else: # Handle degraded mode pass

create_entity

create_entity(entity_id, name=None, parent_id=None, cascade=False, metadata=None, principal=None)

Create a new entity.

Parameters:

Name Type Description Default
entity_id str

Unique identifier for the entity

required
name str | None

Human-readable name (defaults to entity_id)

None
parent_id str | None

Parent entity ID (None for root/project entities)

None
cascade bool

If True, acquire() will also consume from parent entity

False
metadata dict[str, str] | None

Additional metadata to store

None
principal str | None

Caller identity for audit logging (optional)

None

Returns:

Type Description
Entity

The created Entity

Raises:

Type Description
EntityExistsError

If entity already exists

get_entity

get_entity(entity_id)

Get an entity by ID.

delete_entity

delete_entity(entity_id, principal=None)

Delete an entity and all its related data.

Parameters:

Name Type Description Default
entity_id str

ID of the entity to delete

required
principal str | None

Caller identity for audit logging (optional)

None

get_children

get_children(parent_id)

Get all children of a parent entity.

get_audit_events

get_audit_events(entity_id, limit=100, start_event_id=None)

Get audit events for an entity.

Retrieves security audit events logged for administrative operations on the specified entity, ordered by most recent first.

Parameters:

Name Type Description Default
entity_id str

ID of the entity to query

required
limit int

Maximum number of events to return (default: 100)

100
start_event_id str | None

Event ID to start after (for pagination)

None

Returns:

Type Description
list[AuditEvent]

List of AuditEvent objects, ordered by most recent first

Example

events = limiter.get_audit_events("proj-1") for event in events: print(f"{event.timestamp}: {event.action} by {event.principal}")

get_usage_snapshots

get_usage_snapshots(entity_id=None, resource=None, window_type=None, start_time=None, end_time=None, limit=100, next_key=None)

Query usage snapshots for historical consumption data.

Usage snapshots are created by the aggregator Lambda from DynamoDB stream events. They track token consumption per entity/resource within time windows (hourly, daily).

Supports two query modes: 1. Entity-scoped: Provide entity_id (optionally with resource filter) 2. Resource-scoped: Provide resource to query across all entities

Parameters:

Name Type Description Default
entity_id str | None

Entity to query (uses primary key)

None
resource str | None

Resource name filter (required if entity_id is None)

None
window_type str | None

Filter by window type ("hourly", "daily")

None
start_time datetime | None

Filter snapshots >= this timestamp

None
end_time datetime | None

Filter snapshots <= this timestamp

None
limit int

Maximum items to fetch from DynamoDB per page (default: 100)

100
next_key dict[str, Any] | None

Pagination cursor from previous call

None

Returns:

Type Description
tuple[list[UsageSnapshot], dict[str, Any] | None]

Tuple of (snapshots, next_key). next_key is None if no more results.

Raises:

Type Description
ValueError

If neither entity_id nor resource is provided

Note

The limit parameter controls the DynamoDB query batch size. Client-side filters (window_type, start_time, end_time) are applied after fetching, so the returned count may be less than limit. Use next_key to paginate through all matching results.

Example
Get hourly snapshots for an entity

snapshots, cursor = limiter.get_usage_snapshots( entity_id="user-123", resource="gpt-4", window_type="hourly", start_time=datetime(2024, 1, 1), end_time=datetime(2024, 1, 31), ) for snap in snapshots: print(f"{snap.window_start}: {snap.counters}")

Paginate through results

while cursor: more, cursor = limiter.get_usage_snapshots( entity_id="user-123", next_key=cursor, )

get_usage_summary

get_usage_summary(entity_id=None, resource=None, window_type=None, start_time=None, end_time=None)

Get aggregated usage summary across multiple snapshots.

Fetches all matching snapshots and computes total and average consumption statistics. Useful for billing, reporting, and capacity planning.

Parameters:

Name Type Description Default
entity_id str | None

Entity to query

None
resource str | None

Resource name filter (required if entity_id is None)

None
window_type str | None

Filter by window type ("hourly", "daily")

None
start_time datetime | None

Filter snapshots >= this timestamp

None
end_time datetime | None

Filter snapshots <= this timestamp

None

Returns:

Type Description
UsageSummary

UsageSummary with total and average consumption per limit type

Raises:

Type Description
ValueError

If neither entity_id nor resource is provided

Example

summary = limiter.get_usage_summary( entity_id="user-123", resource="gpt-4", window_type="hourly", start_time=datetime(2024, 1, 1), end_time=datetime(2024, 1, 31), ) print(f"Total tokens: {summary.total.get('tpm', 0)}") print(f"Average per hour: {summary.average.get('tpm', 0.0):.1f}") print(f"Snapshots: {summary.snapshot_count}")

acquire

acquire(entity_id, resource, consume, limits=None, use_stored_limits=False, on_unavailable=None)

Acquire rate limit capacity.

Limits are resolved automatically from stored config using four-tier hierarchy: Entity > Entity Default > Resource > System. Pass limits to override.

Cascade behavior is controlled by the entity's cascade flag, set at entity creation time via create_entity(cascade=True). When enabled, acquire() automatically consumes from both the entity and its parent.

Parameters:

Name Type Description Default
entity_id str

Entity to acquire capacity for

required
resource str

Resource being accessed (e.g., "gpt-4")

required
consume dict[str, int]

Amounts to consume by limit name

required
limits list[Limit] | None

Override stored config with explicit limits (optional)

None
use_stored_limits bool

DEPRECATED - limits are now always resolved from stored config. This parameter will be removed in v1.0.

False
on_unavailable OnUnavailable | None

Override default on_unavailable behavior

None

Yields:

Type Description
SyncLease

SyncLease for managing additional consumption

Raises:

Type Description
RateLimitExceeded

If any limit would be exceeded

RateLimiterUnavailable

If DynamoDB unavailable and BLOCK

ValidationError

If no limits configured at any level

available

available(entity_id, resource, limits=None, use_stored_limits=False)

Check available capacity without consuming.

Limits are resolved using four-tier hierarchy: Entity > Entity Default > Resource > System. If no stored limits found, falls back to the limits parameter.

Returns minimum available across entity (and parent if cascade). Can return negative values if bucket is in debt.

Parameters:

Name Type Description Default
entity_id str

Entity to check

required
resource str

Resource to check

required
limits list[Limit] | None

Override limits (optional, falls back to stored config)

None
use_stored_limits bool

DEPRECATED - limits are now always resolved from stored config. This parameter will be removed in v1.0.

False

Returns:

Type Description
dict[str, int]

Dict mapping limit_name -> available tokens

Raises:

Type Description
ValidationError

If no limits found at any level and no override provided

time_until_available

time_until_available(entity_id, resource, needed, limits=None, use_stored_limits=False)

Calculate seconds until requested capacity is available.

Limits are resolved using four-tier hierarchy: Entity > Entity Default > Resource > System. If no stored limits found, falls back to the limits parameter.

Parameters:

Name Type Description Default
entity_id str

Entity to check

required
resource str

Resource to check

required
needed dict[str, int]

Required amounts by limit name

required
limits list[Limit] | None

Override limits (optional, falls back to stored config)

None
use_stored_limits bool

DEPRECATED - limits are now always resolved from stored config. This parameter will be removed in v1.0.

False

Returns:

Type Description
float

Seconds until available (0.0 if already available)

Raises:

Type Description
ValidationError

If no limits found at any level and no override provided

set_limits

set_limits(entity_id, limits, resource=DEFAULT_RESOURCE, principal=None)

Store limit configs for an entity.

Parameters:

Name Type Description Default
entity_id str

Entity to set limits for

required
limits list[Limit]

Limits to store

required
resource str

Resource these limits apply to (or default)

DEFAULT_RESOURCE
principal str | None

Caller identity for audit logging (optional)

None

get_limits

get_limits(entity_id, resource=DEFAULT_RESOURCE)

Get stored limit configs for an entity.

Parameters:

Name Type Description Default
entity_id str

Entity to get limits for

required
resource str

Resource to get limits for

DEFAULT_RESOURCE

Returns:

Type Description
list[Limit]

List of stored limits (empty if none)

delete_limits

delete_limits(entity_id, resource=DEFAULT_RESOURCE, principal=None)

Delete stored limit configs for an entity.

Reconciles existing buckets to fallback config (resource/system defaults) by syncing limit fields, setting TTL, and removing stale limit attributes (issue #327).

Parameters:

Name Type Description Default
entity_id str

Entity to delete limits for

required
resource str

Resource to delete limits for

DEFAULT_RESOURCE
principal str | None

Caller identity for audit logging (optional)

None

list_entities_with_custom_limits

list_entities_with_custom_limits(resource, limit=None, cursor=None)

List all entities that have custom limit configurations.

Uses GSI3 sparse index for efficient queries. Only entities with custom limits for the specified resource are returned.

Parameters:

Name Type Description Default
resource str

Resource to filter by.

required
limit int | None

Maximum number of entities to return. None for all.

None
cursor str | None

Pagination cursor from previous call.

None

Returns:

Type Description
tuple[list[str], str | None]

Tuple of (entity_ids, next_cursor). next_cursor is None if no more results.

Example
Get all entities with custom limits for gpt-4

entities, cursor = limiter.list_entities_with_custom_limits("gpt-4") for entity_id in entities: print(entity_id)

Paginate through results

while cursor: more, cursor = limiter.list_entities_with_custom_limits( "gpt-4", cursor=cursor ) entities.extend(more)

list_resources_with_entity_configs

list_resources_with_entity_configs()

List all resources that have entity-level custom limit configurations.

Uses the entity config resources registry for efficient O(1) lookup.

Returns:

Type Description
list[str]

Sorted list of resource names with at least one entity having custom limits

Example

resources = limiter.list_resources_with_entity_configs() for resource in resources: entities, _ = limiter.list_entities_with_custom_limits(resource) print(f"{resource}: {len(entities)} entities with custom limits")

set_resource_defaults

set_resource_defaults(resource, limits, principal=None)

Store default limit configs for a resource.

Resource defaults override system defaults for the specified resource.

Parameters:

Name Type Description Default
resource str

Resource name

required
limits list[Limit]

Limits to store

required
principal str | None

Caller identity for audit logging (optional)

None

get_resource_defaults

get_resource_defaults(resource)

Get stored default limit configs for a resource.

Parameters:

Name Type Description Default
resource str

Resource name

required

Returns:

Type Description
list[Limit]

List of stored limits (empty if none)

delete_resource_defaults

delete_resource_defaults(resource, principal=None)

Delete stored default limit configs for a resource.

Parameters:

Name Type Description Default
resource str

Resource name

required
principal str | None

Caller identity for audit logging (optional)

None

list_resources_with_defaults

list_resources_with_defaults()

List all resources that have default limit configs.

set_system_defaults

set_system_defaults(limits, on_unavailable=None, principal=None)

Store system-wide default limits and config.

System defaults apply to ALL resources unless overridden at resource or entity level.

Parameters:

Name Type Description Default
limits list[Limit]

Limits to store (apply globally to all resources)

required
on_unavailable OnUnavailable | None

Behavior when DynamoDB unavailable (optional)

None
principal str | None

Caller identity for audit logging (optional)

None

get_system_defaults

get_system_defaults()

Get system-wide default limits and config.

Returns:

Type Description
tuple[list[Limit], OnUnavailable | None]

Tuple of (limits, on_unavailable). on_unavailable may be None if not set.

delete_system_defaults

delete_system_defaults(principal=None)

Delete all system-wide default limits and config.

Parameters:

Name Type Description Default
principal str | None

Caller identity for audit logging (optional)

None

get_resource_capacity

get_resource_capacity(resource, limit_name, parents_only=False)

Get aggregated capacity for a resource across all entities.

Parameters:

Name Type Description Default
resource str

Resource to query

required
limit_name str

Limit name to query

required
parents_only bool

If True, only include parent entities

False

Returns:

Type Description
ResourceCapacity

ResourceCapacity with aggregated data

OnUnavailable

OnUnavailable

Bases: Enum

Behavior when DynamoDB is unavailable.