Skip to content

Allow Controlling Concurrency Limit Strategies #163

@abrookins

Description

@abrookins

Problem Statement

Docket's concurrency limits feature has limitations that make it difficult to implement patterns other than basing a limit on an argument passed to a task function. These stem from using a hard-coded strategy to resolve concurrency limit key generation.

1. Limited Global Function Limits

Currently, implementing a global limit for a function (regardless of arguments) requires awkward workarounds:

# Current workaround - requires dummy parameter
async def heavy_computation(
    data: dict,
    task_type: str = "computation",  # Dummy parameter for concurrency grouping
    concurrency: ConcurrencyLimit = ConcurrencyLimit("task_type", max_concurrent=3)
):
    await process_data(data)

2. Limited Multi-Argument Grouping

No support for grouping by combinations of arguments:

# Not possible: limit by region AND priority combination
# Want: max 2 tasks per (account_id, department_id) pair
async def curiously_codependent_task(
    data: dict,
    account_id: str, 
    department_id: str,
    # Can only give one field name currently
    concurrency: ConcurrencyLimit = ConcurrencyLimit("account_id", max_concurrent=3)
):
    await process_data(data)

3. Sharing Concurrency Limit Objects

NOTE: I documented a limitation here that was incorrect. I'm pretty sure you can create a limit object and then reference it from multiple functions.

Proposed Solution

Implement a flexible strategy-based concurrency system using LIMIT_BY namespace that provides:

1. Clean Strategy API

from docket import ConcurrencyLimit, LIMIT_BY

# Function argument grouping works the same as before
ConcurrencyLimit("customer_id", max_concurrent=1)

# But now this translates behind the scenes to the new approach
ConcurrencyLimit(LIMIT_BY.fn_argument("customer_id"), max_concurrent=1)

# Group limits by function name (in other words, by task)
ConcurrencyLimit(LIMIT_BY.fn_name, max_concurrent=3)

# Cross-function, shared, global limits not based on a function argument
ConcurrencyLimit(LIMIT_BY.constant("billing_ops"), max_concurrent=2)

# Multi-argument grouping
ConcurrencyLimit(["region", "priority"], max_concurrent=5)

# Behind the scenes, this translates to...
ConcurrencyLimit(LIMIT_BY.fn_argument("region", "priority"), max_concurrent=5)

# Custom grouping logic, implement with any callable
ConcurrencyLimit(LIMIT_BY.fn(lambda args: f"{args['api_key']}:{args['tier']}"), max_concurrent=10)

2. Backwards Compatibility

# Existing code continues to work unchanged
ConcurrencyLimit("customer_id", max_concurrent=1)  # Still works

3. Extensible Custom Strategies

class ApiRateLimitStrategy(ConcurrencyStrategy):
    def get_key(self, execution) -> str:
        api_key = execution.get_argument("api_key")
        # Perhaps we need runtime data -- hopefully, this is a fast call!
        tier = get_tier_for_rate_account(api_key)
        return f"api:{api_key}:{tier}"

ConcurrencyLimit(ApiRateLimitStrategy(), max_concurrent=5)

Future Enhancements

This design provides a foundation for future concurrency control features:

Conflict Resolution Strategies

ConcurrencyLimit(
    LIMIT_BY.fn_argument("customer_id"),
    max_concurrent=1,
    on_conflict=ConflictResolution.CANCEL  # vs QUEUE, PRIORITY, etc.
)

Metadata

Metadata

Assignees

Labels

devxMakes Docket the most ergonomic background task system

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions