RateLimiter¶
The main rate limiter classes for async and sync usage.
RateLimiter (Async)¶
RateLimiter
¶
RateLimiter(repository=None, name=None, region=None, endpoint_url=None, stack_options=None, on_unavailable=_UNSET, auto_update=_UNSET, bucket_ttl_refill_multiplier=_UNSET, speculative_writes=True)
Async rate limiter backed by DynamoDB.
Implements token bucket algorithm with support for: - Multiple limits per entity/resource - Two-level hierarchy (parent/child entities) - Cascade mode (consume from entity + parent) - Stored limit configs - Usage analytics
Example (new API - preferred): from zae_limiter import RateLimiter, Repository, StackOptions
repo = Repository(
name="my-app",
region="us-east-1",
stack_options=StackOptions(),
)
limiter = RateLimiter(repository=repo)
Example (old API - deprecated): limiter = RateLimiter( name="my-app", region="us-east-1", stack_options=StackOptions(), )
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
repository
|
RepositoryProtocol | None
|
Repository instance (new API, preferred). Pass a Repository or any RepositoryProtocol implementation. |
None
|
name
|
str | None
|
DEPRECATED. Use |
None
|
region
|
str | None
|
DEPRECATED. Use |
None
|
endpoint_url
|
str | None
|
DEPRECATED. Use |
None
|
stack_options
|
StackOptions | None
|
DEPRECATED. Use |
None
|
on_unavailable
|
OnUnavailable | Any
|
DEPRECATED. Use |
_UNSET
|
auto_update
|
bool | Any
|
DEPRECATED. Use |
_UNSET
|
bucket_ttl_refill_multiplier
|
int | Any
|
DEPRECATED. Use
|
_UNSET
|
speculative_writes
|
bool
|
Enable speculative UpdateItem fast path. When True, acquire() tries a speculative write first, falling back to the full read-write path only when needed. |
True
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If both repository and name/region/endpoint_url/stack_options are provided. |
list_deployed
async
classmethod
¶
List all deployed rate limiter instances in a region.
This is a class method that discovers existing deployments without
requiring an initialized RateLimiter instance. It queries CloudFormation
for stacks tagged with ManagedBy=zae-limiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
region
|
str | None
|
AWS region (default: use boto3 defaults) |
None
|
endpoint_url
|
str | None
|
CloudFormation endpoint (for LocalStack) |
None
|
Returns:
| Type | Description |
|---|---|
list[LimiterInfo]
|
List of LimiterInfo objects describing deployed instances. |
list[LimiterInfo]
|
Sorted by user-friendly name. Excludes deleted stacks. |
Example
Discover all limiters in us-east-1¶
limiters = await RateLimiter.list_deployed(region="us-east-1") for limiter in limiters: if limiter.is_healthy: print(f"✓ {limiter.user_name}: {limiter.version}") elif limiter.is_failed: print(f"✗ {limiter.user_name}: {limiter.stack_status}")
Raises:
| Type | Description |
|---|---|
ClientError
|
If CloudFormation API call fails |
is_available
async
¶
Check if the rate limiter backend (DynamoDB) is reachable.
Performs a lightweight health check without requiring initialization. This method never raises exceptions - it returns False on any error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time in seconds to wait for response (default: 1.0) |
1.0
|
Returns:
| Type | Description |
|---|---|
bool
|
True if DynamoDB table is reachable, False otherwise. |
Example
limiter = RateLimiter(name="my-app", region="us-east-1") if await limiter.is_available(): async with limiter.acquire(...) as lease: ... else: # Handle degraded mode pass
create_entity
async
¶
Create a new entity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Unique identifier for the entity |
required |
name
|
str | None
|
Human-readable name (defaults to entity_id) |
None
|
parent_id
|
str | None
|
Parent entity ID (None for root/project entities) |
None
|
cascade
|
bool
|
If True, acquire() will also consume from parent entity |
False
|
metadata
|
dict[str, str] | None
|
Additional metadata to store |
None
|
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
Returns:
| Type | Description |
|---|---|
Entity
|
The created Entity |
Raises:
| Type | Description |
|---|---|
EntityExistsError
|
If entity already exists |
delete_entity
async
¶
Delete an entity and all its related data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
ID of the entity to delete |
required |
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
get_audit_events
async
¶
Get audit events for an entity.
Retrieves security audit events logged for administrative operations on the specified entity, ordered by most recent first.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
ID of the entity to query |
required |
limit
|
int
|
Maximum number of events to return (default: 100) |
100
|
start_event_id
|
str | None
|
Event ID to start after (for pagination) |
None
|
Returns:
| Type | Description |
|---|---|
list[AuditEvent]
|
List of AuditEvent objects, ordered by most recent first |
Example
events = await limiter.get_audit_events("proj-1") for event in events: print(f"{event.timestamp}: {event.action} by {event.principal}")
get_usage_snapshots
async
¶
get_usage_snapshots(entity_id=None, resource=None, window_type=None, start_time=None, end_time=None, limit=100, next_key=None)
Query usage snapshots for historical consumption data.
Usage snapshots are created by the aggregator Lambda from DynamoDB stream events. They track token consumption per entity/resource within time windows (hourly, daily).
Supports two query modes: 1. Entity-scoped: Provide entity_id (optionally with resource filter) 2. Resource-scoped: Provide resource to query across all entities
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str | None
|
Entity to query (uses primary key) |
None
|
resource
|
str | None
|
Resource name filter (required if entity_id is None) |
None
|
window_type
|
str | None
|
Filter by window type ("hourly", "daily") |
None
|
start_time
|
datetime | None
|
Filter snapshots >= this timestamp |
None
|
end_time
|
datetime | None
|
Filter snapshots <= this timestamp |
None
|
limit
|
int
|
Maximum items to fetch from DynamoDB per page (default: 100) |
100
|
next_key
|
dict[str, Any] | None
|
Pagination cursor from previous call |
None
|
Returns:
| Type | Description |
|---|---|
tuple[list[UsageSnapshot], dict[str, Any] | None]
|
Tuple of (snapshots, next_key). next_key is None if no more results. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither entity_id nor resource is provided |
Note
The limit parameter controls the DynamoDB query batch size.
Client-side filters (window_type, start_time, end_time) are applied
after fetching, so the returned count may be less than limit.
Use next_key to paginate through all matching results.
Example
Get hourly snapshots for an entity¶
snapshots, cursor = await limiter.get_usage_snapshots( entity_id="user-123", resource="gpt-4", window_type="hourly", start_time=datetime(2024, 1, 1), end_time=datetime(2024, 1, 31), ) for snap in snapshots: print(f"{snap.window_start}: {snap.counters}")
Paginate through results¶
while cursor: more, cursor = await limiter.get_usage_snapshots( entity_id="user-123", next_key=cursor, )
get_usage_summary
async
¶
Get aggregated usage summary across multiple snapshots.
Fetches all matching snapshots and computes total and average consumption statistics. Useful for billing, reporting, and capacity planning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str | None
|
Entity to query |
None
|
resource
|
str | None
|
Resource name filter (required if entity_id is None) |
None
|
window_type
|
str | None
|
Filter by window type ("hourly", "daily") |
None
|
start_time
|
datetime | None
|
Filter snapshots >= this timestamp |
None
|
end_time
|
datetime | None
|
Filter snapshots <= this timestamp |
None
|
Returns:
| Type | Description |
|---|---|
UsageSummary
|
UsageSummary with total and average consumption per limit type |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither entity_id nor resource is provided |
Example
summary = await limiter.get_usage_summary( entity_id="user-123", resource="gpt-4", window_type="hourly", start_time=datetime(2024, 1, 1), end_time=datetime(2024, 1, 31), ) print(f"Total tokens: {summary.total.get('tpm', 0)}") print(f"Average per hour: {summary.average.get('tpm', 0.0):.1f}") print(f"Snapshots: {summary.snapshot_count}")
acquire
async
¶
Acquire rate limit capacity.
Limits are resolved automatically from stored config using four-tier
hierarchy: Entity > Entity Default > Resource > System. Pass limits to override.
Cascade behavior is controlled by the entity's cascade flag, set at
entity creation time via create_entity(cascade=True). When enabled,
acquire() automatically consumes from both the entity and its parent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity to acquire capacity for |
required |
resource
|
str
|
Resource being accessed (e.g., "gpt-4") |
required |
consume
|
dict[str, int]
|
Amounts to consume by limit name |
required |
limits
|
list[Limit] | None
|
Override stored config with explicit limits (optional) |
None
|
use_stored_limits
|
bool
|
DEPRECATED - limits are now always resolved from stored config. This parameter will be removed in v1.0. |
False
|
on_unavailable
|
OnUnavailable | None
|
Override default on_unavailable behavior |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[Lease]
|
Lease for managing additional consumption |
Raises:
| Type | Description |
|---|---|
RateLimitExceeded
|
If any limit would be exceeded |
RateLimiterUnavailable
|
If DynamoDB unavailable and BLOCK |
ValidationError
|
If no limits configured at any level |
available
async
¶
Check available capacity without consuming.
Limits are resolved using four-tier hierarchy: Entity > Entity Default > Resource > System.
If no stored limits found, falls back to the limits parameter.
Returns minimum available across entity (and parent if cascade). Can return negative values if bucket is in debt.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity to check |
required |
resource
|
str
|
Resource to check |
required |
limits
|
list[Limit] | None
|
Override limits (optional, falls back to stored config) |
None
|
use_stored_limits
|
bool
|
DEPRECATED - limits are now always resolved from stored config. This parameter will be removed in v1.0. |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, int]
|
Dict mapping limit_name -> available tokens |
Raises:
| Type | Description |
|---|---|
ValidationError
|
If no limits found at any level and no override provided |
time_until_available
async
¶
Calculate seconds until requested capacity is available.
Limits are resolved using four-tier hierarchy: Entity > Entity Default > Resource > System.
If no stored limits found, falls back to the limits parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity to check |
required |
resource
|
str
|
Resource to check |
required |
needed
|
dict[str, int]
|
Required amounts by limit name |
required |
limits
|
list[Limit] | None
|
Override limits (optional, falls back to stored config) |
None
|
use_stored_limits
|
bool
|
DEPRECATED - limits are now always resolved from stored config. This parameter will be removed in v1.0. |
False
|
Returns:
| Type | Description |
|---|---|
float
|
Seconds until available (0.0 if already available) |
Raises:
| Type | Description |
|---|---|
ValidationError
|
If no limits found at any level and no override provided |
set_limits
async
¶
Store limit configs for an entity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity to set limits for |
required |
limits
|
list[Limit]
|
Limits to store |
required |
resource
|
str
|
Resource these limits apply to (or default) |
DEFAULT_RESOURCE
|
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
get_limits
async
¶
Get stored limit configs for an entity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity to get limits for |
required |
resource
|
str
|
Resource to get limits for |
DEFAULT_RESOURCE
|
Returns:
| Type | Description |
|---|---|
list[Limit]
|
List of stored limits (empty if none) |
delete_limits
async
¶
Delete stored limit configs for an entity.
Reconciles existing buckets to fallback config (resource/system defaults) by syncing limit fields, setting TTL, and removing stale limit attributes (issue #327).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity to delete limits for |
required |
resource
|
str
|
Resource to delete limits for |
DEFAULT_RESOURCE
|
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
list_entities_with_custom_limits
async
¶
List all entities that have custom limit configurations.
Uses GSI3 sparse index for efficient queries. Only entities with custom limits for the specified resource are returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource
|
str
|
Resource to filter by. |
required |
limit
|
int | None
|
Maximum number of entities to return. None for all. |
None
|
cursor
|
str | None
|
Pagination cursor from previous call. |
None
|
Returns:
| Type | Description |
|---|---|
tuple[list[str], str | None]
|
Tuple of (entity_ids, next_cursor). next_cursor is None if no more results. |
Example
Get all entities with custom limits for gpt-4¶
entities, cursor = await limiter.list_entities_with_custom_limits("gpt-4") for entity_id in entities: print(entity_id)
Paginate through results¶
while cursor: more, cursor = await limiter.list_entities_with_custom_limits( "gpt-4", cursor=cursor ) entities.extend(more)
list_resources_with_entity_configs
async
¶
List all resources that have entity-level custom limit configurations.
Uses the entity config resources registry for efficient O(1) lookup.
Returns:
| Type | Description |
|---|---|
list[str]
|
Sorted list of resource names with at least one entity having custom limits |
Example
resources = await limiter.list_resources_with_entity_configs() for resource in resources: entities, _ = await limiter.list_entities_with_custom_limits(resource) print(f"{resource}: {len(entities)} entities with custom limits")
set_resource_defaults
async
¶
Store default limit configs for a resource.
Resource defaults override system defaults for the specified resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource
|
str
|
Resource name |
required |
limits
|
list[Limit]
|
Limits to store |
required |
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
get_resource_defaults
async
¶
Get stored default limit configs for a resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource
|
str
|
Resource name |
required |
Returns:
| Type | Description |
|---|---|
list[Limit]
|
List of stored limits (empty if none) |
delete_resource_defaults
async
¶
Delete stored default limit configs for a resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource
|
str
|
Resource name |
required |
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
list_resources_with_defaults
async
¶
List all resources that have default limit configs.
set_system_defaults
async
¶
Store system-wide default limits and config.
System defaults apply to ALL resources unless overridden at resource or entity level.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limits
|
list[Limit]
|
Limits to store (apply globally to all resources) |
required |
on_unavailable
|
OnUnavailable | None
|
Behavior when DynamoDB unavailable (optional) |
None
|
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
get_system_defaults
async
¶
Get system-wide default limits and config.
Returns:
| Type | Description |
|---|---|
tuple[list[Limit], OnUnavailable | None]
|
Tuple of (limits, on_unavailable). on_unavailable may be None if not set. |
delete_system_defaults
async
¶
Delete all system-wide default limits and config.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
get_resource_capacity
async
¶
Get aggregated capacity for a resource across all entities.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource
|
str
|
Resource to query |
required |
limit_name
|
str
|
Limit name to query |
required |
parents_only
|
bool
|
If True, only include parent entities |
False
|
Returns:
| Type | Description |
|---|---|
ResourceCapacity
|
ResourceCapacity with aggregated data |
SyncRateLimiter¶
SyncRateLimiter
¶
SyncRateLimiter(repository=None, name=None, region=None, endpoint_url=None, stack_options=None, on_unavailable=_UNSET, auto_update=_UNSET, bucket_ttl_refill_multiplier=_UNSET, speculative_writes=True)
Async rate limiter backed by DynamoDB.
Implements token bucket algorithm with support for: - Multiple limits per entity/resource - Two-level hierarchy (parent/child entities) - Cascade mode (consume from entity + parent) - Stored limit configs - Usage analytics
Example (new API - preferred): from zae_limiter import SyncRateLimiter, SyncRepository, StackOptions
repo = SyncRepository(
name="my-app",
region="us-east-1",
stack_options=StackOptions(),
)
limiter = SyncRateLimiter(repository=repo)
Example (old API - deprecated): limiter = SyncRateLimiter( name="my-app", region="us-east-1", stack_options=StackOptions(), )
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
repository
|
SyncRepositoryProtocol | None
|
SyncRepository instance (new API, preferred). Pass a SyncRepository or any SyncRepositoryProtocol implementation. |
None
|
name
|
str | None
|
DEPRECATED. Use |
None
|
region
|
str | None
|
DEPRECATED. Use |
None
|
endpoint_url
|
str | None
|
DEPRECATED. Use |
None
|
stack_options
|
StackOptions | None
|
DEPRECATED. Use |
None
|
on_unavailable
|
OnUnavailable | Any
|
DEPRECATED. Use |
_UNSET
|
auto_update
|
bool | Any
|
DEPRECATED. Use |
_UNSET
|
bucket_ttl_refill_multiplier
|
int | Any
|
DEPRECATED. Use
|
_UNSET
|
speculative_writes
|
bool
|
Enable speculative UpdateItem fast path. When True, acquire() tries a speculative write first, falling back to the full read-write path only when needed. |
True
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If both repository and name/region/endpoint_url/stack_options are provided. |
list_deployed
classmethod
¶
List all deployed rate limiter instances in a region.
This is a class method that discovers existing deployments without
requiring an initialized SyncRateLimiter instance. It queries CloudFormation
for stacks tagged with ManagedBy=zae-limiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
region
|
str | None
|
AWS region (default: use boto3 defaults) |
None
|
endpoint_url
|
str | None
|
CloudFormation endpoint (for LocalStack) |
None
|
Returns:
| Type | Description |
|---|---|
list[LimiterInfo]
|
List of LimiterInfo objects describing deployed instances. |
list[LimiterInfo]
|
Sorted by user-friendly name. Excludes deleted stacks. |
Example
Discover all limiters in us-east-1¶
limiters = SyncRateLimiter.list_deployed(region="us-east-1") for limiter in limiters: if limiter.is_healthy: print(f"✓ {limiter.user_name}: {limiter.version}") elif limiter.is_failed: print(f"✗ {limiter.user_name}: {limiter.stack_status}")
Raises:
| Type | Description |
|---|---|
ClientError
|
If CloudFormation API call fails |
is_available
¶
Check if the rate limiter backend (DynamoDB) is reachable.
Performs a lightweight health check without requiring initialization. This method never raises exceptions - it returns False on any error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time in seconds to wait for response (default: 1.0) |
1.0
|
Returns:
| Type | Description |
|---|---|
bool
|
True if DynamoDB table is reachable, False otherwise. |
Example
limiter = SyncRateLimiter(name="my-app", region="us-east-1") if limiter.is_available(): async with limiter.acquire(...) as lease: ... else: # Handle degraded mode pass
create_entity
¶
Create a new entity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Unique identifier for the entity |
required |
name
|
str | None
|
Human-readable name (defaults to entity_id) |
None
|
parent_id
|
str | None
|
Parent entity ID (None for root/project entities) |
None
|
cascade
|
bool
|
If True, acquire() will also consume from parent entity |
False
|
metadata
|
dict[str, str] | None
|
Additional metadata to store |
None
|
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
Returns:
| Type | Description |
|---|---|
Entity
|
The created Entity |
Raises:
| Type | Description |
|---|---|
EntityExistsError
|
If entity already exists |
delete_entity
¶
Delete an entity and all its related data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
ID of the entity to delete |
required |
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
get_audit_events
¶
Get audit events for an entity.
Retrieves security audit events logged for administrative operations on the specified entity, ordered by most recent first.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
ID of the entity to query |
required |
limit
|
int
|
Maximum number of events to return (default: 100) |
100
|
start_event_id
|
str | None
|
Event ID to start after (for pagination) |
None
|
Returns:
| Type | Description |
|---|---|
list[AuditEvent]
|
List of AuditEvent objects, ordered by most recent first |
Example
events = limiter.get_audit_events("proj-1") for event in events: print(f"{event.timestamp}: {event.action} by {event.principal}")
get_usage_snapshots
¶
get_usage_snapshots(entity_id=None, resource=None, window_type=None, start_time=None, end_time=None, limit=100, next_key=None)
Query usage snapshots for historical consumption data.
Usage snapshots are created by the aggregator Lambda from DynamoDB stream events. They track token consumption per entity/resource within time windows (hourly, daily).
Supports two query modes: 1. Entity-scoped: Provide entity_id (optionally with resource filter) 2. Resource-scoped: Provide resource to query across all entities
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str | None
|
Entity to query (uses primary key) |
None
|
resource
|
str | None
|
Resource name filter (required if entity_id is None) |
None
|
window_type
|
str | None
|
Filter by window type ("hourly", "daily") |
None
|
start_time
|
datetime | None
|
Filter snapshots >= this timestamp |
None
|
end_time
|
datetime | None
|
Filter snapshots <= this timestamp |
None
|
limit
|
int
|
Maximum items to fetch from DynamoDB per page (default: 100) |
100
|
next_key
|
dict[str, Any] | None
|
Pagination cursor from previous call |
None
|
Returns:
| Type | Description |
|---|---|
tuple[list[UsageSnapshot], dict[str, Any] | None]
|
Tuple of (snapshots, next_key). next_key is None if no more results. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither entity_id nor resource is provided |
Note
The limit parameter controls the DynamoDB query batch size.
Client-side filters (window_type, start_time, end_time) are applied
after fetching, so the returned count may be less than limit.
Use next_key to paginate through all matching results.
Example
Get hourly snapshots for an entity¶
snapshots, cursor = limiter.get_usage_snapshots( entity_id="user-123", resource="gpt-4", window_type="hourly", start_time=datetime(2024, 1, 1), end_time=datetime(2024, 1, 31), ) for snap in snapshots: print(f"{snap.window_start}: {snap.counters}")
Paginate through results¶
while cursor: more, cursor = limiter.get_usage_snapshots( entity_id="user-123", next_key=cursor, )
get_usage_summary
¶
Get aggregated usage summary across multiple snapshots.
Fetches all matching snapshots and computes total and average consumption statistics. Useful for billing, reporting, and capacity planning.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str | None
|
Entity to query |
None
|
resource
|
str | None
|
Resource name filter (required if entity_id is None) |
None
|
window_type
|
str | None
|
Filter by window type ("hourly", "daily") |
None
|
start_time
|
datetime | None
|
Filter snapshots >= this timestamp |
None
|
end_time
|
datetime | None
|
Filter snapshots <= this timestamp |
None
|
Returns:
| Type | Description |
|---|---|
UsageSummary
|
UsageSummary with total and average consumption per limit type |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither entity_id nor resource is provided |
Example
summary = limiter.get_usage_summary( entity_id="user-123", resource="gpt-4", window_type="hourly", start_time=datetime(2024, 1, 1), end_time=datetime(2024, 1, 31), ) print(f"Total tokens: {summary.total.get('tpm', 0)}") print(f"Average per hour: {summary.average.get('tpm', 0.0):.1f}") print(f"Snapshots: {summary.snapshot_count}")
acquire
¶
Acquire rate limit capacity.
Limits are resolved automatically from stored config using four-tier
hierarchy: Entity > Entity Default > Resource > System. Pass limits to override.
Cascade behavior is controlled by the entity's cascade flag, set at
entity creation time via create_entity(cascade=True). When enabled,
acquire() automatically consumes from both the entity and its parent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity to acquire capacity for |
required |
resource
|
str
|
Resource being accessed (e.g., "gpt-4") |
required |
consume
|
dict[str, int]
|
Amounts to consume by limit name |
required |
limits
|
list[Limit] | None
|
Override stored config with explicit limits (optional) |
None
|
use_stored_limits
|
bool
|
DEPRECATED - limits are now always resolved from stored config. This parameter will be removed in v1.0. |
False
|
on_unavailable
|
OnUnavailable | None
|
Override default on_unavailable behavior |
None
|
Yields:
| Type | Description |
|---|---|
SyncLease
|
SyncLease for managing additional consumption |
Raises:
| Type | Description |
|---|---|
RateLimitExceeded
|
If any limit would be exceeded |
RateLimiterUnavailable
|
If DynamoDB unavailable and BLOCK |
ValidationError
|
If no limits configured at any level |
available
¶
Check available capacity without consuming.
Limits are resolved using four-tier hierarchy: Entity > Entity Default > Resource > System.
If no stored limits found, falls back to the limits parameter.
Returns minimum available across entity (and parent if cascade). Can return negative values if bucket is in debt.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity to check |
required |
resource
|
str
|
Resource to check |
required |
limits
|
list[Limit] | None
|
Override limits (optional, falls back to stored config) |
None
|
use_stored_limits
|
bool
|
DEPRECATED - limits are now always resolved from stored config. This parameter will be removed in v1.0. |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, int]
|
Dict mapping limit_name -> available tokens |
Raises:
| Type | Description |
|---|---|
ValidationError
|
If no limits found at any level and no override provided |
time_until_available
¶
Calculate seconds until requested capacity is available.
Limits are resolved using four-tier hierarchy: Entity > Entity Default > Resource > System.
If no stored limits found, falls back to the limits parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity to check |
required |
resource
|
str
|
Resource to check |
required |
needed
|
dict[str, int]
|
Required amounts by limit name |
required |
limits
|
list[Limit] | None
|
Override limits (optional, falls back to stored config) |
None
|
use_stored_limits
|
bool
|
DEPRECATED - limits are now always resolved from stored config. This parameter will be removed in v1.0. |
False
|
Returns:
| Type | Description |
|---|---|
float
|
Seconds until available (0.0 if already available) |
Raises:
| Type | Description |
|---|---|
ValidationError
|
If no limits found at any level and no override provided |
set_limits
¶
Store limit configs for an entity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity to set limits for |
required |
limits
|
list[Limit]
|
Limits to store |
required |
resource
|
str
|
Resource these limits apply to (or default) |
DEFAULT_RESOURCE
|
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
get_limits
¶
Get stored limit configs for an entity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity to get limits for |
required |
resource
|
str
|
Resource to get limits for |
DEFAULT_RESOURCE
|
Returns:
| Type | Description |
|---|---|
list[Limit]
|
List of stored limits (empty if none) |
delete_limits
¶
Delete stored limit configs for an entity.
Reconciles existing buckets to fallback config (resource/system defaults) by syncing limit fields, setting TTL, and removing stale limit attributes (issue #327).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity to delete limits for |
required |
resource
|
str
|
Resource to delete limits for |
DEFAULT_RESOURCE
|
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
list_entities_with_custom_limits
¶
List all entities that have custom limit configurations.
Uses GSI3 sparse index for efficient queries. Only entities with custom limits for the specified resource are returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource
|
str
|
Resource to filter by. |
required |
limit
|
int | None
|
Maximum number of entities to return. None for all. |
None
|
cursor
|
str | None
|
Pagination cursor from previous call. |
None
|
Returns:
| Type | Description |
|---|---|
tuple[list[str], str | None]
|
Tuple of (entity_ids, next_cursor). next_cursor is None if no more results. |
Example
Get all entities with custom limits for gpt-4¶
entities, cursor = limiter.list_entities_with_custom_limits("gpt-4") for entity_id in entities: print(entity_id)
Paginate through results¶
while cursor: more, cursor = limiter.list_entities_with_custom_limits( "gpt-4", cursor=cursor ) entities.extend(more)
list_resources_with_entity_configs
¶
List all resources that have entity-level custom limit configurations.
Uses the entity config resources registry for efficient O(1) lookup.
Returns:
| Type | Description |
|---|---|
list[str]
|
Sorted list of resource names with at least one entity having custom limits |
Example
resources = limiter.list_resources_with_entity_configs() for resource in resources: entities, _ = limiter.list_entities_with_custom_limits(resource) print(f"{resource}: {len(entities)} entities with custom limits")
set_resource_defaults
¶
Store default limit configs for a resource.
Resource defaults override system defaults for the specified resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource
|
str
|
Resource name |
required |
limits
|
list[Limit]
|
Limits to store |
required |
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
get_resource_defaults
¶
Get stored default limit configs for a resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource
|
str
|
Resource name |
required |
Returns:
| Type | Description |
|---|---|
list[Limit]
|
List of stored limits (empty if none) |
delete_resource_defaults
¶
Delete stored default limit configs for a resource.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource
|
str
|
Resource name |
required |
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
list_resources_with_defaults
¶
List all resources that have default limit configs.
set_system_defaults
¶
Store system-wide default limits and config.
System defaults apply to ALL resources unless overridden at resource or entity level.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limits
|
list[Limit]
|
Limits to store (apply globally to all resources) |
required |
on_unavailable
|
OnUnavailable | None
|
Behavior when DynamoDB unavailable (optional) |
None
|
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
get_system_defaults
¶
Get system-wide default limits and config.
Returns:
| Type | Description |
|---|---|
tuple[list[Limit], OnUnavailable | None]
|
Tuple of (limits, on_unavailable). on_unavailable may be None if not set. |
delete_system_defaults
¶
Delete all system-wide default limits and config.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
principal
|
str | None
|
Caller identity for audit logging (optional) |
None
|
get_resource_capacity
¶
Get aggregated capacity for a resource across all entities.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource
|
str
|
Resource to query |
required |
limit_name
|
str
|
Limit name to query |
required |
parents_only
|
bool
|
If True, only include parent entities |
False
|
Returns:
| Type | Description |
|---|---|
ResourceCapacity
|
ResourceCapacity with aggregated data |
OnUnavailable¶
OnUnavailable
¶
Bases: Enum
Behavior when DynamoDB is unavailable.