-
Notifications
You must be signed in to change notification settings - Fork 8
Allow Controlling Concurrency Limit Strategies #163
Copy link
Copy link
Closed
Labels
devxMakes Docket the most ergonomic background task systemMakes Docket the most ergonomic background task system
Description
Problem Statement
Docket's concurrency limits feature has limitations that make it difficult to implement patterns other than basing a limit on an argument passed to a task function. These stem from using a hard-coded strategy to resolve concurrency limit key generation.
1. Limited Global Function Limits
Currently, implementing a global limit for a function (regardless of arguments) requires awkward workarounds:
# Current workaround - requires dummy parameter
async def heavy_computation(
data: dict,
task_type: str = "computation", # Dummy parameter for concurrency grouping
concurrency: ConcurrencyLimit = ConcurrencyLimit("task_type", max_concurrent=3)
):
await process_data(data)2. Limited Multi-Argument Grouping
No support for grouping by combinations of arguments:
# Not possible: limit by region AND priority combination
# Want: max 2 tasks per (account_id, department_id) pair
async def curiously_codependent_task(
data: dict,
account_id: str,
department_id: str,
# Can only give one field name currently
concurrency: ConcurrencyLimit = ConcurrencyLimit("account_id", max_concurrent=3)
):
await process_data(data)3. Sharing Concurrency Limit Objects
NOTE: I documented a limitation here that was incorrect. I'm pretty sure you can create a limit object and then reference it from multiple functions.
Proposed Solution
Implement a flexible strategy-based concurrency system using LIMIT_BY namespace that provides:
1. Clean Strategy API
from docket import ConcurrencyLimit, LIMIT_BY
# Function argument grouping works the same as before
ConcurrencyLimit("customer_id", max_concurrent=1)
# But now this translates behind the scenes to the new approach
ConcurrencyLimit(LIMIT_BY.fn_argument("customer_id"), max_concurrent=1)
# Group limits by function name (in other words, by task)
ConcurrencyLimit(LIMIT_BY.fn_name, max_concurrent=3)
# Cross-function, shared, global limits not based on a function argument
ConcurrencyLimit(LIMIT_BY.constant("billing_ops"), max_concurrent=2)
# Multi-argument grouping
ConcurrencyLimit(["region", "priority"], max_concurrent=5)
# Behind the scenes, this translates to...
ConcurrencyLimit(LIMIT_BY.fn_argument("region", "priority"), max_concurrent=5)
# Custom grouping logic, implement with any callable
ConcurrencyLimit(LIMIT_BY.fn(lambda args: f"{args['api_key']}:{args['tier']}"), max_concurrent=10)2. Backwards Compatibility
# Existing code continues to work unchanged
ConcurrencyLimit("customer_id", max_concurrent=1) # Still works3. Extensible Custom Strategies
class ApiRateLimitStrategy(ConcurrencyStrategy):
def get_key(self, execution) -> str:
api_key = execution.get_argument("api_key")
# Perhaps we need runtime data -- hopefully, this is a fast call!
tier = get_tier_for_rate_account(api_key)
return f"api:{api_key}:{tier}"
ConcurrencyLimit(ApiRateLimitStrategy(), max_concurrent=5)Future Enhancements
This design provides a foundation for future concurrency control features:
Conflict Resolution Strategies
ConcurrencyLimit(
LIMIT_BY.fn_argument("customer_id"),
max_concurrent=1,
on_conflict=ConflictResolution.CANCEL # vs QUEUE, PRIORITY, etc.
)Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
devxMakes Docket the most ergonomic background task systemMakes Docket the most ergonomic background task system