Skip to content

bug: throttle acquire timeout is hardcoded at 300s, causes early shutdown on slow inference endpoints #551

@andreatgretel

Description

@andreatgretel

Priority Level

Medium (Annoying but has workaround)

Describe the bug

When running the async engine against a slow inference endpoint (e.g. a large model on DGX Spark), the ThrottleManager.acquire_async() timeout causes spurious ModelTimeoutError failures that cascade into early shutdown and incomplete datasets.

The root cause is that DEFAULT_ACQUIRE_TIMEOUT is hardcoded to 300s in throttle_manager.py:31 and not exposed through ThrottleConfig or RunConfig. With a slow model (e.g. 60-120s per request) and AIMD starting at concurrency=1, tasks queued behind in-flight requests can wait 300s+ for a permit without ever starting a request. When the deadline expires, the throttle layer raises TimeoutError, which propagates as:

TimeoutError (throttle_manager.py:397)
  -> ProviderError(kind=TIMEOUT) (throttled.py:166)
    -> ModelTimeoutError (errors.py:291)

ModelTimeoutError is retryable, so tasks get deferred to salvage rounds. But salvage rounds hit the same timeout, the error rate crosses the threshold, and the scheduler triggers early shutdown - dropping rows and producing an incomplete dataset.

Importantly, the model itself responds correctly (just slowly). No actual request times out - the tasks fail waiting in the concurrency queue before they ever send a request.

Steps/Code to reproduce bug

The scenario can be reproduced with a mock slow generator that raises ModelTimeoutError after a few successful calls, simulating what the throttle layer does when permits are exhausted:

Mock generators and reproduction test
class MockSlowCellGenerator(ColumnGenerator[ExpressionColumnConfig]):
    """Cell generator that simulates DGX Spark-class latency.

    Each call sleeps for ``latency`` seconds before returning. When used behind
    the throttle layer with a tight concurrency limit and the default 300s
    acquire timeout, queued tasks time out before getting a permit.

    For testing, ``timeout_after`` makes the generator raise ModelTimeoutError
    after the Nth call, reproducing what the ThrottledModelClient does when the
    throttle acquire deadline expires.
    """

    def __init__(
        self,
        *args: Any,
        latency: float = 0.0,
        timeout_after: int | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, **kwargs)
        self._latency = latency
        self._timeout_after = timeout_after
        self._call_count = 0

    @staticmethod
    def get_generation_strategy() -> GenerationStrategy:
        return GenerationStrategy.CELL_BY_CELL

    def generate(self, data: dict) -> dict:
        self._call_count += 1
        if self._timeout_after is not None and self._call_count > self._timeout_after:
            raise ModelTimeoutError(
                f"Throttle acquire timed out after 300s for slow-provider/large-model [chat]"
            )
        data[self.config.name] = f"slow_{data.get('seed', '?')}"
        return data

    async def agenerate(self, data: dict) -> dict:
        self._call_count += 1
        if self._timeout_after is not None and self._call_count > self._timeout_after:
            raise ModelTimeoutError(
                f"Throttle acquire timed out after 300s for slow-provider/large-model [chat]"
            )
        await asyncio.sleep(self._latency)
        data[self.config.name] = f"slow_{data.get('seed', '?')}"
        return data


@pytest.mark.asyncio(loop_scope="session")
async def test_scheduler_slow_model_timeout_triggers_early_shutdown() -> None:
    provider = _mock_provider()
    num_records = 6

    configs = [
        SamplerColumnConfig(name="seed", sampler_type=SamplerType.CATEGORY, params={"values": ["A"]}),
        LLMTextColumnConfig(name="answer", prompt="{{ seed }}", model_alias=MODEL_ALIAS),
    ]
    strategies = {
        "seed": GenerationStrategy.FULL_COLUMN,
        "answer": GenerationStrategy.CELL_BY_CELL,
    }

    slow_gen = MockSlowCellGenerator(
        config=_expr_config("answer"),
        resource_provider=provider,
        timeout_after=2,  # First 2 calls succeed, then every call times out
    )
    generators: dict[str, ColumnGenerator] = {
        "seed": MockSeedGenerator(config=_expr_config("seed"), resource_provider=provider),
        "answer": slow_gen,
    }

    graph = ExecutionGraph.create(configs, strategies)
    row_groups = [(0, num_records)]
    tracker = CompletionTracker.with_graph(graph, row_groups)
    buffer_manager = RowGroupBufferManager(MagicMock())

    scheduler = AsyncTaskScheduler(
        generators=generators,
        graph=graph,
        tracker=tracker,
        row_groups=row_groups,
        buffer_manager=buffer_manager,
        shutdown_error_window=4,
        shutdown_error_rate=0.5,
    )
    await asyncio.wait_for(scheduler.run(), timeout=10.0)

    assert scheduler._early_shutdown, "Early shutdown flag should be set"
    dropped = sum(1 for ri in range(num_records) if tracker.is_dropped(0, ri))
    assert dropped > 0, "Expected some rows to be dropped due to timeout errors"

Expected behavior

Users should be able to configure the throttle acquire timeout via ThrottleConfig (e.g. acquire_timeout: float = 300.0) so that slow-inference endpoints don't trigger spurious timeouts. The current workaround is disable_early_shutdown: true in RunConfig, but that masks all errors, not just throttle queue timeouts.

Agent Diagnostic / Prior Investigation

  • DEFAULT_ACQUIRE_TIMEOUT is defined at throttle_manager.py:31 (300s)
  • Used by acquire_sync (line 324) and acquire_async (line 373) with no override path from config
  • ThrottleConfig in run_config.py has AIMD tuning knobs but no acquire timeout field
  • ThrottledModelClient._athrottled() catches TimeoutError and wraps it as ProviderError(kind=TIMEOUT) (throttled.py:165-171)
  • The model facade maps ProviderErrorKind.TIMEOUT to ModelTimeoutError (errors.py:291)
  • ModelTimeoutError is in _RETRYABLE_MODEL_ERRORS (async_scheduler.py:49-54), so it gets deferred and retried - but retries hit the same timeout

Additional context

Reported by @mvansegbroeck running on DGX Spark with large models where per-request latency is 60-120s. The issue is specific to slow endpoints with low concurrency - fast endpoints or high-concurrency setups won't saturate the queue long enough to hit the 300s deadline.

Checklist

  • I reproduced this issue or provided a minimal example
  • I searched the docs/issues myself, or had my agent do so
  • If I used an agent, I included its diagnostics above

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions