feat: per-channel concurrency queue + admission control#1503
Conversation
PR #1322 only added scoring-layer down-ranking, which was a no-op when a model only had one channel. This change adds true admission enforcement. Backend: - ChannelLimiter: soft mode (count only) and hard mode (FIFO queue + capacity ceiling + per-channel timeout), with slot-handoff Release for FIFO fairness. - ChannelLimiterManager: per-channel limiter cache keyed by config struct equality; exposes Snapshot for observability. - ChannelQueueError: synthetic 429 + structured body (channel_queue_full / channel_queue_timeout). No Retry-After to avoid the cooldown middleware misclassifying local rejections as upstream 429s. - New ChannelRateLimit fields: QueueSize, QueueTimeoutMs. - Middleware order: channel admission runs before rate-limit tracking so locally rejected requests do not consume RPM budget. - LB scoring composes RPM/TPM and concurrency sub-scores via min(); hard- mode queueing capped at 30% so idle peers keep a clear advantage. - Removed legacy ConnectionTracker; ChannelLimiterManager is the single source of truth for in-flight stats. - OpenTelemetry: 5 channel-level instruments (inflight / queue_waiting gauges, queue_full / queue_timeout counters, queue_wait_seconds histogram). - GraphQL Channel.liveLimiterStats field for real-time UI display. - ValidateRateLimit guards against negatives and queueSize without maxConcurrent. Frontend: - channels-rate-limit-dialog: queueSize + queueTimeoutMs inputs with cross-field validation. - channel-limiter-cell: text-only inFlight/cap and Q waiting/queueSize display, color-coded by utilization, polled every 5s. - i18n strings for new fields and tooltips (en + zh-CN).
When a user configures only MaxConcurrent and leaves Queue Size empty, the limiter runs in soft mode — the cap only down-ranks the channel in load-balancer scoring, it does NOT block excess upstream requests. This is easy to miss; surface an inline amber advisory under the Queue Size field so users know to set Queue Size for hard enforcement.
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
There was a problem hiding this comment.
Pull request overview
Adds true per-channel concurrency admission control (hard/soft modes) with FIFO queueing, typed 429 queue errors, OTel observability, and live UI feedback (GraphQL + frontend polling/rendering) to close #1130.
Changes:
- Backend: introduce
ChannelLimiter/ChannelLimiterManager, queue-aware error typing, middleware ordering update, strategy scoring updates, and metrics instrumentation. - API/GraphQL: add
ChannelRateLimit.QueueSize/QueueTimeoutMsandChannel.liveLimiterStats. - Frontend: add queue config inputs + validation, live limiter cell in channels table, and polling to keep stats fresh.
Reviewed changes
Copilot reviewed 58 out of 58 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/server/orchestrator/tester.go | Wire test orchestrator to use ChannelLimiterManager instead of legacy connection tracking. |
| internal/server/orchestrator/rate_limit_tracking_test.go | Add regression test ensuring local queue errors don’t trigger cooldown. |
| internal/server/orchestrator/rate_limit_tracking.go | Ignore local queue rejections for cooldown tracking. |
| internal/server/orchestrator/quota_minute_test.go | Update orchestrator construction to use limiter manager. |
| internal/server/orchestrator/outbound.go | Prevent retrying on local queue rejection errors. |
| internal/server/orchestrator/orchestrator_streaming_test.go | Replace connection-tracking tests with queue/RPM ordering + limiter lifecycle coverage. |
| internal/server/orchestrator/orchestrator_error_test.go | Update orchestrator construction to use limiter manager. |
| internal/server/orchestrator/orchestrator_basic_test.go | Update orchestrator construction to use limiter manager. |
| internal/server/orchestrator/orchestrator.go | Add channel limiter middleware (before rate-limit tracking), metrics registration, and strategy wiring. |
| internal/server/orchestrator/model_circuit_breaker.go | Skip recording model errors for local queue rejections. |
| internal/server/orchestrator/lb_strategy_rate_limit_test.go | Expand strategy tests to cover soft/hard concurrency + min-composition with RPM. |
| internal/server/orchestrator/lb_strategy_rate_limit.go | Rework scoring to compose RPM/TPM with concurrency stats from limiter manager. |
| internal/server/orchestrator/lb_strategy_bp_test.go | Remove tests for legacy ConnectionAwareStrategy. |
| internal/server/orchestrator/lb_strategy_bp.go | Remove legacy ConnectionAwareStrategy and its tracker interface. |
| internal/server/orchestrator/fx_module.go | Provide ChannelLimiterManager via Fx. |
| internal/server/orchestrator/connection_tracking_test.go | New tests for channel-limiter middleware behavior and release safety. |
| internal/server/orchestrator/connection_tracking.go | Replace connection-tracking middleware with channel admission middleware. |
| internal/server/orchestrator/connection_tracker_test.go | Remove legacy connection tracker tests. |
| internal/server/orchestrator/connection_tracker.go | Remove legacy in-memory connection tracker. |
| internal/server/orchestrator/channel_queue_error_test.go | Add tests for typed queue error wrapping + 429 shaping without Retry-After. |
| internal/server/orchestrator/channel_queue_error.go | Implement ChannelQueueError + wrapping helpers and detection. |
| internal/server/orchestrator/channel_limiter_test.go | Add unit tests for soft/hard limiter modes, FIFO fairness, and leak prevention. |
| internal/server/orchestrator/channel_limiter_metrics_test.go | Add tests for limiter metrics instruments and callbacks. |
| internal/server/orchestrator/channel_limiter_metrics.go | Implement OTel gauges/counters/histogram for limiter observability. |
| internal/server/orchestrator/channel_limiter_manager_test.go | Add tests for limiter config extraction, caching, rebuild-on-change, and stats. |
| internal/server/orchestrator/channel_limiter_manager.go | Implement per-channel limiter cache keyed by normalized config equality. |
| internal/server/orchestrator/channel_limiter.go | Implement per-channel limiter with soft counting and hard FIFO queueing + timeout. |
| internal/server/orchestrator/channel_help_test.go | Update test helper to construct orchestrator with limiter manager. |
| internal/server/orchestrator/candidates_loadbalance_test.go | Remove connection-tracking-based selector test and simplify selector wiring. |
| internal/server/orchestrator/candidates_decorator_test.go | Update selector wiring to remove connection tracker dependency. |
| internal/server/orchestrator/candidates_basic_test.go | Update selector wiring to remove connection tracker dependency. |
| internal/server/gql/resolver.go | Inject limiter manager into GraphQL resolver root. |
| internal/server/gql/models_gen.go | Add ChannelLimiterStats model type for GraphQL. |
| internal/server/gql/graphql.go | Add limiter manager to GraphQL handler dependencies. |
| internal/server/gql/generated.go | Update generated schema/types for new rate-limit fields + liveLimiterStats. |
| internal/server/gql/axonhub.resolvers.go | Implement Channel.liveLimiterStats resolver using limiter manager stats. |
| internal/server/gql/axonhub.graphql | Extend schema with queue fields and ChannelLimiterStats + liveLimiterStats. |
| internal/server/biz/channel_rate_limit_test.go | Add validation tests for new queue fields and invariants. |
| internal/server/biz/channel_rate_limit.go | Implement ValidateRateLimit for non-negative fields + queue invariants. |
| internal/server/biz/channel_limiter_hook.go | Add biz-layer hook interface to allow orchestrator to forget limiter entries. |
| internal/server/biz/channel_bulk.go | Forget limiter entries on bulk delete. |
| internal/server/biz/channel.go | Validate rate-limit settings on create/update; forget limiter on delete. |
| internal/server/api/playground.go | Pass limiter manager into orchestrator constructor. |
| internal/server/api/openai.go | Pass limiter manager into orchestrator constructor for OpenAI endpoints. |
| internal/server/api/jina.go | Pass limiter manager into orchestrator constructor for Jina endpoints. |
| internal/server/api/gemini.go | Pass limiter manager into orchestrator constructor for Gemini endpoints. |
| internal/server/api/doubao.go | Pass limiter manager into orchestrator constructor for Doubao endpoints. |
| internal/server/api/anthropic.go | Pass limiter manager into orchestrator constructor for Anthropic endpoints. |
| internal/server/api/aisdk.go | Pass limiter manager into orchestrator constructor for AI SDK endpoints. |
| internal/objects/channel.go | Add QueueSize and QueueTimeoutMs to channel rate-limit settings. |
| frontend/src/routeTree.gen.ts | Regenerate route typings/paths (notably trailing-slash fullPaths). |
| frontend/src/locales/zh-CN/channels.json | Add limiter/queue UI strings (zh-CN). |
| frontend/src/locales/en/channels.json | Add limiter/queue UI strings (en). |
| frontend/src/features/channels/data/schema.ts | Extend channel schemas to include queue fields and liveLimiterStats. |
| frontend/src/features/channels/data/channels.ts | Query queue fields + liveLimiterStats; poll channels query every 5s. |
| frontend/src/features/channels/components/channels-rate-limit-dialog.tsx | Add queue inputs, cross-field validation, and soft-mode advisory UI. |
| frontend/src/features/channels/components/channels-columns.tsx | Render ChannelLimiterCell under health column when stats present. |
| frontend/src/features/channels/components/channel-limiter-cell.tsx | New compact, color-coded cell with tooltip for in-flight/queue stats. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| rl := channel.Settings.RateLimit | ||
| capacity := int(*rl.MaxConcurrent) | ||
|
|
There was a problem hiding this comment.
scoreConcurrency assumes channel.Settings.RateLimit and RateLimit.MaxConcurrent are non-nil when a limiter entry exists (ok==true). If a channel previously had a limiter but is later updated to remove/clear MaxConcurrent (or Settings/RateLimit), ChannelLimiterManager will still return ok==true and this will panic on channel.Settings.RateLimit / *rl.MaxConcurrent. Add defensive checks here (treat as “no limiter configured” and return maxScore) and/or ensure the manager drops stale entries when concurrency limiting is disabled so Stats() can’t return ok==true for a now-unlimited channel.
| func (m *ChannelLimiterManager) GetOrCreate(ch *biz.Channel) *ChannelLimiter { | ||
| cfg := extractLimiterConfig(ch) | ||
| if cfg.capacity == 0 { | ||
| return nil | ||
| } |
There was a problem hiding this comment.
GetOrCreate returns nil when concurrency limiting is disabled (cfg.capacity == 0), but it does not remove any existing cached entry for that channel. If a channel used to have MaxConcurrent set and later clears it, the old limiter remains in entries, so Stats()/Snapshot() will still report load and (more importantly) scoring code may assume MaxConcurrent is non-nil. Consider deleting the existing entry when cfg.capacity == 0 (e.g., Forget(ch.ID) under lock) to prevent stale stats and potential nil-deref panics in callers.
| // Intentionally NO forgetLimiter call: ChannelLimiterManager.GetOrCreate | ||
| // already detects rate-limit changes via cfg equality and rebuilds on the | ||
| // next request. Calling Forget on every update (including unrelated | ||
| // settings) would orphan in-flight slots and let the next batch of | ||
| // requests transiently exceed MaxConcurrent. |
There was a problem hiding this comment.
UpdateChannel never calls forgetLimiter anymore. That’s fine for unrelated updates, but when an update disables concurrency limiting (e.g., sets settings.rateLimit.maxConcurrent to nil/0 or clears settings.rateLimit), the orchestrator’s ChannelLimiterManager can retain a stale limiter entry indefinitely unless something explicitly forgets it. Suggest calling forgetLimiter(id) only for the “limiter disabled” case (or otherwise ensuring the manager drops entries when MaxConcurrent is cleared) to avoid stale stats and potential nil-deref panics in downstream scoring.
| "channels.dialogs.rateLimit.fields.queueTimeoutMs.placeholder": "毫秒,留空表示不超时", | ||
| "channels.dialogs.rateLimit.fields.queueTimeoutMs.description": "请求在队列中可等待的最长时间(毫秒),超时后以 HTTP 429 拒绝。仅在设置队列长度时生效。", | ||
| "channels.dialogs.rateLimit.errors.queueRequiresMaxConcurrent": "设置排队队列长度时,必须同时设置最大并发数(大于 0)。", | ||
| "channels.dialogs.rateLimit.hints.softModeWarning": "未设置队列长度时,最大并发数仅在负载均衡评分中将此渠道降权,**不会真正阻塞超出的请求**。如需硬性限制,请同时设置队列长度(例如 1)和队列等待超时。", |
There was a problem hiding this comment.
The zh-CN string channels.dialogs.rateLimit.hints.softModeWarning contains markdown-style **...**, but it’s rendered as plain text in the dialog (inside a <span>). This will likely show the asterisks literally. Consider removing the markdown markers or switching to a rendering path that supports emphasis consistently across locales.
| "channels.dialogs.rateLimit.hints.softModeWarning": "未设置队列长度时,最大并发数仅在负载均衡评分中将此渠道降权,**不会真正阻塞超出的请求**。如需硬性限制,请同时设置队列长度(例如 1)和队列等待超时。", | |
| "channels.dialogs.rateLimit.hints.softModeWarning": "未设置队列长度时,最大并发数仅在负载均衡评分中将此渠道降权,不会真正阻塞超出的请求。如需硬性限制,请同时设置队列长度(例如 1)和队列等待超时。", |
| // ChannelQueueError represents a channel-level admission failure raised by | ||
| // ChannelLimiter inside the connectionTracking middleware. | ||
| // |
There was a problem hiding this comment.
Docstring mismatch: ChannelQueueError is described as raised by ChannelLimiter inside the connectionTracking middleware, but that middleware was replaced/renamed (this code now wraps errors produced by the channel-limiter middleware). Updating the comment will help future readers find the right code path.
There was a problem hiding this comment.
Code Review
This pull request replaces the existing connection tracking with a robust per-channel concurrency admission control system. It introduces a ChannelLimiter that supports both soft-mode scoring and hard-mode FIFO queuing with timeouts, complemented by frontend updates for real-time monitoring and configuration. The review highlights critical improvements needed to prevent memory leaks when disabling limits, avoid nil pointer dereferences in the rate-limit strategy, and correct a scoring inversion logic error to maintain consistent channel ranking under load.
| if cfg.capacity == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| m.mu.RLock() | ||
| if e, ok := m.entries[ch.ID]; ok && e.cfg == cfg { | ||
| m.mu.RUnlock() | ||
|
|
||
| return e.limiter | ||
| } | ||
| m.mu.RUnlock() |
There was a problem hiding this comment.
The GetOrCreate method does not remove existing limiter entries when a channel's configuration is updated to disable concurrency limits (i.e., when MaxConcurrent is set to 0 or nil). This leads to a memory leak where orphaned ChannelLimiter instances remain in the entries map indefinitely. Furthermore, these orphaned limiters will continue to be reported in metrics and GraphQL queries, providing stale data.
Additionally, this lack of cleanup combined with the logic in RateLimitAwareStrategy.scoreConcurrency can lead to a nil pointer dereference panic if the channel's rate limit settings are removed entirely.
| if cfg.capacity == 0 { | |
| return nil | |
| } | |
| m.mu.RLock() | |
| if e, ok := m.entries[ch.ID]; ok && e.cfg == cfg { | |
| m.mu.RUnlock() | |
| return e.limiter | |
| } | |
| m.mu.RUnlock() | |
| if cfg.capacity == 0 { | |
| m.mu.RLock() | |
| _, ok := m.entries[ch.ID] | |
| m.mu.RUnlock() | |
| if ok { | |
| m.Forget(ch.ID) | |
| } | |
| return nil | |
| } | |
| m.mu.RLock() | |
| if e, ok := m.entries[ch.ID]; ok && e.cfg == cfg { | |
| m.mu.RUnlock() | |
| return e.limiter | |
| } | |
| m.mu.RUnlock() |
| rl := channel.Settings.RateLimit | ||
| capacity := int(*rl.MaxConcurrent) |
There was a problem hiding this comment.
This code is susceptible to a nil pointer dereference. If a channel was previously configured with a concurrency limit (so an entry exists in the limiterManager) but is subsequently updated to remove the limit, channel.Settings.RateLimit or rl.MaxConcurrent could be nil. Since limiterManager.Stats might still return ok=true for a short period (or indefinitely if the cleanup issue in GetOrCreate is not addressed), accessing these pointers directly will cause a panic.
rl := channel.Settings.RateLimit
if rl == nil || rl.MaxConcurrent == nil || *rl.MaxConcurrent <= 0 {
return s.maxScore, false
}
capacity := int(*rl.MaxConcurrent)| if inFlight >= capacity { | ||
| waitingRatio := float64(waiting) / float64(queueSize) | ||
|
|
||
| // Check RPM | ||
| if rl.RPM != nil && *rl.RPM > 0 { | ||
| rpm := s.requestTracker.GetRequestCount(channel.ID) | ||
| details["rpm_limit"] = *rl.RPM | ||
| details["rpm_current"] = rpm | ||
|
|
||
| if rpm >= *rl.RPM { | ||
| exhausted = true | ||
| details["rpm_exhausted"] = true | ||
| } else { | ||
| ratio := float64(rpm) / float64(*rl.RPM) | ||
| if ratio > maxRatio { | ||
| maxRatio = ratio | ||
| if details != nil { | ||
| details["concurrency_mode"] = "hard_queueing" | ||
| details["concurrency_waiting_ratio"] = waitingRatio | ||
| } | ||
|
|
||
| return scaleScore(s.maxScore*queueingScoreCeiling, 1-waitingRatio), false | ||
| } | ||
| } | ||
|
|
||
| // Check TPM | ||
| if rl.TPM != nil && *rl.TPM > 0 { | ||
| tpm := s.requestTracker.GetTokenCount(channel.ID) | ||
| details["tpm_limit"] = *rl.TPM | ||
| details["tpm_current"] = tpm | ||
| ratio := float64(inFlight) / float64(capacity) | ||
|
|
||
| if tpm >= *rl.TPM { | ||
| exhausted = true | ||
| details["tpm_exhausted"] = true | ||
| } else { | ||
| ratio := float64(tpm) / float64(*rl.TPM) | ||
| if ratio > maxRatio { | ||
| maxRatio = ratio | ||
| } | ||
| if details != nil { | ||
| details["concurrency_mode"] = "hard_below_capacity" | ||
| details["concurrency_inflight_ratio"] = ratio | ||
| } | ||
| } | ||
|
|
||
| // Check concurrent requests using explicit MaxConcurrent first, then default tracker fallback. | ||
| if s.connectionTracker != nil { | ||
| if concurrencyLimit, source, configured := s.resolveConcurrencyLimit(channel); concurrencyLimit > 0 { | ||
| concurrent := s.connectionTracker.GetActiveConnections(channel.ID) | ||
| details["concurrent_limit"] = concurrencyLimit | ||
| details["concurrent_current"] = concurrent | ||
| details["concurrency_limit_source"] = source | ||
| details["concurrent_limit_configured"] = configured | ||
|
|
||
| if int64(concurrent) >= concurrencyLimit { | ||
| exhausted = true | ||
| details["concurrent_exhausted"] = true | ||
| } else { | ||
| ratio := float64(concurrent) / float64(concurrencyLimit) | ||
| if ratio > maxRatio { | ||
| maxRatio = ratio | ||
| } | ||
| } | ||
| } | ||
| return scaleScore(s.maxScore, 1-ratio), false |
There was a problem hiding this comment.
There is a score inversion logic issue in hard mode. When inFlight < capacity, the score is calculated as s.maxScore * (1 - ratio), which approaches 0 as the channel nears capacity. However, as soon as the channel reaches capacity and starts queueing (inFlight >= capacity), the score jumps to s.maxScore * queueingScoreCeiling * (1 - waitingRatio). With the default queueingScoreCeiling of 0.3, a channel that is 90% busy (score 10) will be ranked lower than a channel that is already at capacity but has an empty queue (score 30).
To ensure monotonic scoring, the non-queueing score range should be shifted to stay above the queueing score range.
if inFlight >= capacity {
waitingRatio := float64(waiting) / float64(queueSize)
if details != nil {
details["concurrency_mode"] = "hard_queueing"
details["concurrency_waiting_ratio"] = waitingRatio
}
return scaleScore(s.maxScore*queueingScoreCeiling, 1-waitingRatio), false
}
ratio := float64(inFlight) / float64(capacity)
if details != nil {
details["concurrency_mode"] = "hard_below_capacity"
details["concurrency_inflight_ratio"] = ratio
}
minQueueScore := s.maxScore * queueingScoreCeiling
return minQueueScore + scaleScore(s.maxScore-minQueueScore, 1-ratio), falseResolves conflict in internal/server/orchestrator/orchestrator.go: - Drop the legacy withConnectionTracking line (this branch already removed the ConnectionTracker entirely; ChannelLimiterManager is now the single source of truth for in-flight stats). - Keep upstream's new captureRawProviderResponse / captureRawProviderStream middlewares at the end of the outbound list so the raw provider response / stream is captured for pass-through, in addition to keeping the withChannelLimiter -> withRateLimitTracking ordering this branch introduced (so local queue rejections don't consume RPM budget). Other auto-merged files: locales, internal/objects/channel.go, internal/server/orchestrator/outbound.go. Note: 3 pre-existing race-detector failures in upstream's pass_through_test.go (TestCaptureRawProviderStream_PropagatesError, TestPassThroughStream_LLMMiddlewareRuns, TestPassThroughStream_ErrorPropagates) are introduced by upstream PR #1495 and are unrelated to this merge.
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
… hard-mode score Address PR #1503 review findings from copilot/gemini-code-assist: - ChannelLimiterManager.GetOrCreate now drops the cached entry when capacity transitions to 0, so Stats/Snapshot stop reporting a channel that no longer has a concurrency limit and downstream code can no longer dereference now-nil rate-limit pointers. - RateLimitAwareStrategy.scoreConcurrency adds a defensive nil check on channel.Settings.RateLimit / MaxConcurrent so a brief stale Stats hit before the next GetOrCreate cannot panic. - Hard-mode below-capacity score is shifted to [queueCeiling, maxScore] so a 9/10 channel always outranks a 10/10 channel with an empty queue (previously 10 vs 30 — load balancer preferred the saturated channel). - zh-CN softModeWarning drops literal **markers** that the dialog renders as plain text. - channel_queue_error.go docstring refreshed (connectionTracking middleware was renamed to channelLimiter).
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
The channel concurrency limiter middleware held a struct-scoped sync.Once for release. Pipeline.Process re-enters OnOutboundRawRequest on same-channel retry and channel switch, calling Acquire on the same middleware instance again -- but the struct-scoped Once short-circuits every release after the first, permanently leaking the slot. Under sustained upstream 429s with retry enabled, MaxConcurrent slots leak one-by-one until the limiter is fully exhausted and every new request times out in the queue. Replace acquired+releaseOnce with atomic.Pointer[limiterSlot] where each Acquire mints a fresh slot owning its own Once. Stream wrappers capture the slot pointer at wrap time so they release the slot they were minted with, not whatever's current at Close time.
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
感谢 PR,变更有点大,需要帮忙确认下
|
|
问题 1:是否只影响配置了 rate limit 的渠道?功能层面:是没配
代码影响层面:横切改动较大,但已在所有受影响位置同步更新
跑了一组 8 项的端到端 + 单测回归(用自写的 mock OpenAI upstream + 当前 HEAD
已知限制P7 第一次跑时未加 sleep 直接失败:5 个新请求中 2 个仍按旧 limiter 容量被拒绝。这是 PR 自述里 §15「rateLimit 配置热修改的短暂松弛」描述的同一现象 —— 不是 PR 引入的新 bug,是其设计上的 known limitation,运维改 rate-limit 配置后稍等几秒再观察即可。 |
|
lint 挂了 |
Parent tests already call t.Parallel(); subtests must too or the linter flags the asymmetry.
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
fix |
Closes #1130
Summary
PR #1322 only added scoring-layer down-ranking for
MaxConcurrent, which was a no-op when a model had only one channel. This PR adds true admission enforcement plus a FIFO queue, observability, and live UI feedback.Backend
ChannelLimiterwith two modes: soft (count only — preserves PR feat: channel concurrency settings, close #1130 #1322 behaviour) and hard (FIFO queue + capacity ceiling + per-channel timeout). FIFO usescontainer/listwith slot-handoffReleaseso released slots transfer directly to the head waiter — preserves fairness and avoids the "newcomer steals released slot" race.ChannelLimiterManager: per-channel limiter cache keyed bylimiterConfigstruct equality (no hashing); exposesSnapshot()for observability.ChannelQueueError: typed sentinel + synthetic*httpclient.Errorsoerrors.AsType[*httpclient.Error]plucks out a 429 + structured JSON body (channel_queue_full/channel_queue_timeout). Deliberately noRetry-Afterheader so the upstream-cooldown middleware does not misclassify local rejections as upstream 429s.ChannelRateLimitfields:QueueSize,QueueTimeoutMs(both*int64, ms units, matching the existingmetrics_*_msconvention).min(); hard-mode queueing is capped at 30 % so an idle peer keeps a clear advantage.ConnectionTracker(and the unusedConnectionAwareStrategy);ChannelLimiterManageris the single source of truth for in-flight stats.axonhub_channel_inflight,_queue_waitinggauges;_queue_full_total,_queue_timeout_totalcounters;_queue_wait_secondshistogram).ChannelService.ValidateRateLimitrejects negatives andqueueSize > 0withoutmaxConcurrent > 0.Channel.liveLimiterStatsreturning real-time{inFlight, waiting, capacity, queueSize}.Frontend
Queue SizeandQueue Wait Timeoutinputs with cross-field validation; amber advisory when onlyMaxConcurrentis set (soft-mode warning).ChannelLimiterCellrendered inside the existing health column — text-onlyinFlight/cap+Q waiting/queueSize, color-coded (blue/amber/red) by utilisation, polled every 5 s through the existing channels query.Two design notes from review
UpdateChannelno longer callsforgetLimiter—GetOrCreate's cfg-equality check detects real changes without orphaning in-flight slots.forgetLimiteris kept onDeleteChannel/BulkDeleteChannels.Test plan
go test -race -count=1 ./internal/...— full sweep greengo test -race -count=20 ./internal/server/orchestrator/(limiter / FIFO fairness / once-protection soak)make buildclean(MaxConcurrent=2, QueueSize=3, QueueTimeoutMs=1500)returns the expected4 OK + 1 queue_timeout + 3 queue_fullmix; mock log confirms upstreamin_flightnever exceeded 2UpdateChannel(remark change) mid-flight does NOT orphan the live limiter — capacity holds at 1 across the updateChannel.liveLimiterStatsGraphQL field reflects in-flight/waiting counts in real time