Skip to content

feat: per-channel concurrency queue + admission control#1503

Merged
looplj merged 6 commits into
looplj:unstablefrom
suixinio:unstable
Apr 27, 2026
Merged

feat: per-channel concurrency queue + admission control#1503
looplj merged 6 commits into
looplj:unstablefrom
suixinio:unstable

Conversation

@suixinio

Copy link
Copy Markdown
Contributor

Closes #1130

Summary

PR #1322 only added scoring-layer down-ranking for MaxConcurrent, which was a no-op when a model had only one channel. This PR adds true admission enforcement plus a FIFO queue, observability, and live UI feedback.

Backend

  • ChannelLimiter with two modes: soft (count only — preserves PR feat: channel concurrency settings, close #1130 #1322 behaviour) and hard (FIFO queue + capacity ceiling + per-channel timeout). FIFO uses container/list with slot-handoff Release so released slots transfer directly to the head waiter — preserves fairness and avoids the "newcomer steals released slot" race.
  • ChannelLimiterManager: per-channel limiter cache keyed by limiterConfig struct equality (no hashing); exposes Snapshot() for observability.
  • ChannelQueueError: typed sentinel + synthetic *httpclient.Error so errors.AsType[*httpclient.Error] plucks out a 429 + structured JSON body (channel_queue_full / channel_queue_timeout). Deliberately no Retry-After header so the upstream-cooldown middleware does not misclassify local rejections as upstream 429s.
  • New ChannelRateLimit fields: QueueSize, QueueTimeoutMs (both *int64, ms units, matching the existing metrics_*_ms convention).
  • Middleware order: channel admission runs before rate-limit tracking so locally rejected requests do not consume RPM budget.
  • LB scoring composes RPM/TPM and concurrency sub-scores via min(); hard-mode queueing is capped at 30 % so an idle peer keeps a clear advantage.
  • Removed the legacy ConnectionTracker (and the unused ConnectionAwareStrategy); ChannelLimiterManager is the single source of truth for in-flight stats.
  • OpenTelemetry: 5 channel-level instruments (axonhub_channel_inflight, _queue_waiting gauges; _queue_full_total, _queue_timeout_total counters; _queue_wait_seconds histogram).
  • ChannelService.ValidateRateLimit rejects negatives and queueSize > 0 without maxConcurrent > 0.
  • New GraphQL field Channel.liveLimiterStats returning real-time {inFlight, waiting, capacity, queueSize}.

Frontend

  • Rate-limit dialog: new Queue Size and Queue Wait Timeout inputs with cross-field validation; amber advisory when only MaxConcurrent is set (soft-mode warning).
  • New ChannelLimiterCell rendered inside the existing health column — text-only inFlight/cap + Q waiting/queueSize, color-coded (blue/amber/red) by utilisation, polled every 5 s through the existing channels query.
  • i18n strings in en + zh-CN.

Two design notes from review

  • Local queue rejection skips RPM increment (middleware order fix).
  • UpdateChannel no longer calls forgetLimiterGetOrCreate's cfg-equality check detects real changes without orphaning in-flight slots. forgetLimiter is kept on DeleteChannel / BulkDeleteChannels.

Test plan

  • go test -race -count=1 ./internal/... — full sweep green
  • go test -race -count=20 ./internal/server/orchestrator/ (limiter / FIFO fairness / once-protection soak)
  • make build clean
  • End-to-end: mock OpenAI upstream + axonhub container; 8-concurrent hammer at (MaxConcurrent=2, QueueSize=3, QueueTimeoutMs=1500) returns the expected 4 OK + 1 queue_timeout + 3 queue_full mix; mock log confirms upstream in_flight never exceeded 2
  • End-to-end: unrelated UpdateChannel (remark change) mid-flight does NOT orphan the live limiter — capacity holds at 1 across the update
  • End-to-end: live Channel.liveLimiterStats GraphQL field reflects in-flight/waiting counts in real time
  • Codex adversarial review run, two findings addressed (RPM bookkeeping, forget-on-update); third (rate-limit hot-change race) documented as known limitation

PR #1322 only added scoring-layer down-ranking, which was a no-op when a
model only had one channel. This change adds true admission enforcement.

Backend:
- ChannelLimiter: soft mode (count only) and hard mode (FIFO queue +
  capacity ceiling + per-channel timeout), with slot-handoff Release for
  FIFO fairness.
- ChannelLimiterManager: per-channel limiter cache keyed by config struct
  equality; exposes Snapshot for observability.
- ChannelQueueError: synthetic 429 + structured body
  (channel_queue_full / channel_queue_timeout). No Retry-After to avoid the
  cooldown middleware misclassifying local rejections as upstream 429s.
- New ChannelRateLimit fields: QueueSize, QueueTimeoutMs.
- Middleware order: channel admission runs before rate-limit tracking so
  locally rejected requests do not consume RPM budget.
- LB scoring composes RPM/TPM and concurrency sub-scores via min(); hard-
  mode queueing capped at 30% so idle peers keep a clear advantage.
- Removed legacy ConnectionTracker; ChannelLimiterManager is the single
  source of truth for in-flight stats.
- OpenTelemetry: 5 channel-level instruments (inflight / queue_waiting
  gauges, queue_full / queue_timeout counters, queue_wait_seconds
  histogram).
- GraphQL Channel.liveLimiterStats field for real-time UI display.
- ValidateRateLimit guards against negatives and queueSize without
  maxConcurrent.

Frontend:
- channels-rate-limit-dialog: queueSize + queueTimeoutMs inputs with
  cross-field validation.
- channel-limiter-cell: text-only inFlight/cap and Q waiting/queueSize
  display, color-coded by utilization, polled every 5s.
- i18n strings for new fields and tooltips (en + zh-CN).
When a user configures only MaxConcurrent and leaves Queue Size empty,
the limiter runs in soft mode — the cap only down-ranks the channel in
load-balancer scoring, it does NOT block excess upstream requests. This
is easy to miss; surface an inline amber advisory under the Queue Size
field so users know to set Queue Size for hard enforcement.
Copilot AI review requested due to automatic review settings April 26, 2026 12:21

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds true per-channel concurrency admission control (hard/soft modes) with FIFO queueing, typed 429 queue errors, OTel observability, and live UI feedback (GraphQL + frontend polling/rendering) to close #1130.

Changes:

  • Backend: introduce ChannelLimiter/ChannelLimiterManager, queue-aware error typing, middleware ordering update, strategy scoring updates, and metrics instrumentation.
  • API/GraphQL: add ChannelRateLimit.QueueSize/QueueTimeoutMs and Channel.liveLimiterStats.
  • Frontend: add queue config inputs + validation, live limiter cell in channels table, and polling to keep stats fresh.

Reviewed changes

Copilot reviewed 58 out of 58 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
internal/server/orchestrator/tester.go Wire test orchestrator to use ChannelLimiterManager instead of legacy connection tracking.
internal/server/orchestrator/rate_limit_tracking_test.go Add regression test ensuring local queue errors don’t trigger cooldown.
internal/server/orchestrator/rate_limit_tracking.go Ignore local queue rejections for cooldown tracking.
internal/server/orchestrator/quota_minute_test.go Update orchestrator construction to use limiter manager.
internal/server/orchestrator/outbound.go Prevent retrying on local queue rejection errors.
internal/server/orchestrator/orchestrator_streaming_test.go Replace connection-tracking tests with queue/RPM ordering + limiter lifecycle coverage.
internal/server/orchestrator/orchestrator_error_test.go Update orchestrator construction to use limiter manager.
internal/server/orchestrator/orchestrator_basic_test.go Update orchestrator construction to use limiter manager.
internal/server/orchestrator/orchestrator.go Add channel limiter middleware (before rate-limit tracking), metrics registration, and strategy wiring.
internal/server/orchestrator/model_circuit_breaker.go Skip recording model errors for local queue rejections.
internal/server/orchestrator/lb_strategy_rate_limit_test.go Expand strategy tests to cover soft/hard concurrency + min-composition with RPM.
internal/server/orchestrator/lb_strategy_rate_limit.go Rework scoring to compose RPM/TPM with concurrency stats from limiter manager.
internal/server/orchestrator/lb_strategy_bp_test.go Remove tests for legacy ConnectionAwareStrategy.
internal/server/orchestrator/lb_strategy_bp.go Remove legacy ConnectionAwareStrategy and its tracker interface.
internal/server/orchestrator/fx_module.go Provide ChannelLimiterManager via Fx.
internal/server/orchestrator/connection_tracking_test.go New tests for channel-limiter middleware behavior and release safety.
internal/server/orchestrator/connection_tracking.go Replace connection-tracking middleware with channel admission middleware.
internal/server/orchestrator/connection_tracker_test.go Remove legacy connection tracker tests.
internal/server/orchestrator/connection_tracker.go Remove legacy in-memory connection tracker.
internal/server/orchestrator/channel_queue_error_test.go Add tests for typed queue error wrapping + 429 shaping without Retry-After.
internal/server/orchestrator/channel_queue_error.go Implement ChannelQueueError + wrapping helpers and detection.
internal/server/orchestrator/channel_limiter_test.go Add unit tests for soft/hard limiter modes, FIFO fairness, and leak prevention.
internal/server/orchestrator/channel_limiter_metrics_test.go Add tests for limiter metrics instruments and callbacks.
internal/server/orchestrator/channel_limiter_metrics.go Implement OTel gauges/counters/histogram for limiter observability.
internal/server/orchestrator/channel_limiter_manager_test.go Add tests for limiter config extraction, caching, rebuild-on-change, and stats.
internal/server/orchestrator/channel_limiter_manager.go Implement per-channel limiter cache keyed by normalized config equality.
internal/server/orchestrator/channel_limiter.go Implement per-channel limiter with soft counting and hard FIFO queueing + timeout.
internal/server/orchestrator/channel_help_test.go Update test helper to construct orchestrator with limiter manager.
internal/server/orchestrator/candidates_loadbalance_test.go Remove connection-tracking-based selector test and simplify selector wiring.
internal/server/orchestrator/candidates_decorator_test.go Update selector wiring to remove connection tracker dependency.
internal/server/orchestrator/candidates_basic_test.go Update selector wiring to remove connection tracker dependency.
internal/server/gql/resolver.go Inject limiter manager into GraphQL resolver root.
internal/server/gql/models_gen.go Add ChannelLimiterStats model type for GraphQL.
internal/server/gql/graphql.go Add limiter manager to GraphQL handler dependencies.
internal/server/gql/generated.go Update generated schema/types for new rate-limit fields + liveLimiterStats.
internal/server/gql/axonhub.resolvers.go Implement Channel.liveLimiterStats resolver using limiter manager stats.
internal/server/gql/axonhub.graphql Extend schema with queue fields and ChannelLimiterStats + liveLimiterStats.
internal/server/biz/channel_rate_limit_test.go Add validation tests for new queue fields and invariants.
internal/server/biz/channel_rate_limit.go Implement ValidateRateLimit for non-negative fields + queue invariants.
internal/server/biz/channel_limiter_hook.go Add biz-layer hook interface to allow orchestrator to forget limiter entries.
internal/server/biz/channel_bulk.go Forget limiter entries on bulk delete.
internal/server/biz/channel.go Validate rate-limit settings on create/update; forget limiter on delete.
internal/server/api/playground.go Pass limiter manager into orchestrator constructor.
internal/server/api/openai.go Pass limiter manager into orchestrator constructor for OpenAI endpoints.
internal/server/api/jina.go Pass limiter manager into orchestrator constructor for Jina endpoints.
internal/server/api/gemini.go Pass limiter manager into orchestrator constructor for Gemini endpoints.
internal/server/api/doubao.go Pass limiter manager into orchestrator constructor for Doubao endpoints.
internal/server/api/anthropic.go Pass limiter manager into orchestrator constructor for Anthropic endpoints.
internal/server/api/aisdk.go Pass limiter manager into orchestrator constructor for AI SDK endpoints.
internal/objects/channel.go Add QueueSize and QueueTimeoutMs to channel rate-limit settings.
frontend/src/routeTree.gen.ts Regenerate route typings/paths (notably trailing-slash fullPaths).
frontend/src/locales/zh-CN/channels.json Add limiter/queue UI strings (zh-CN).
frontend/src/locales/en/channels.json Add limiter/queue UI strings (en).
frontend/src/features/channels/data/schema.ts Extend channel schemas to include queue fields and liveLimiterStats.
frontend/src/features/channels/data/channels.ts Query queue fields + liveLimiterStats; poll channels query every 5s.
frontend/src/features/channels/components/channels-rate-limit-dialog.tsx Add queue inputs, cross-field validation, and soft-mode advisory UI.
frontend/src/features/channels/components/channels-columns.tsx Render ChannelLimiterCell under health column when stats present.
frontend/src/features/channels/components/channel-limiter-cell.tsx New compact, color-coded cell with tooltip for in-flight/queue stats.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +215 to 217
rl := channel.Settings.RateLimit
capacity := int(*rl.MaxConcurrent)

Copilot AI Apr 26, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scoreConcurrency assumes channel.Settings.RateLimit and RateLimit.MaxConcurrent are non-nil when a limiter entry exists (ok==true). If a channel previously had a limiter but is later updated to remove/clear MaxConcurrent (or Settings/RateLimit), ChannelLimiterManager will still return ok==true and this will panic on channel.Settings.RateLimit / *rl.MaxConcurrent. Add defensive checks here (treat as “no limiter configured” and return maxScore) and/or ensure the manager drops stale entries when concurrency limiting is disabled so Stats() can’t return ok==true for a now-unlimited channel.

Copilot uses AI. Check for mistakes.
Comment on lines +70 to +74
func (m *ChannelLimiterManager) GetOrCreate(ch *biz.Channel) *ChannelLimiter {
cfg := extractLimiterConfig(ch)
if cfg.capacity == 0 {
return nil
}

Copilot AI Apr 26, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetOrCreate returns nil when concurrency limiting is disabled (cfg.capacity == 0), but it does not remove any existing cached entry for that channel. If a channel used to have MaxConcurrent set and later clears it, the old limiter remains in entries, so Stats()/Snapshot() will still report load and (more importantly) scoring code may assume MaxConcurrent is non-nil. Consider deleting the existing entry when cfg.capacity == 0 (e.g., Forget(ch.ID) under lock) to prevent stale stats and potential nil-deref panics in callers.

Copilot uses AI. Check for mistakes.
Comment on lines +591 to +595
// Intentionally NO forgetLimiter call: ChannelLimiterManager.GetOrCreate
// already detects rate-limit changes via cfg equality and rebuilds on the
// next request. Calling Forget on every update (including unrelated
// settings) would orphan in-flight slots and let the next batch of
// requests transiently exceed MaxConcurrent.

Copilot AI Apr 26, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UpdateChannel never calls forgetLimiter anymore. That’s fine for unrelated updates, but when an update disables concurrency limiting (e.g., sets settings.rateLimit.maxConcurrent to nil/0 or clears settings.rateLimit), the orchestrator’s ChannelLimiterManager can retain a stale limiter entry indefinitely unless something explicitly forgets it. Suggest calling forgetLimiter(id) only for the “limiter disabled” case (or otherwise ensuring the manager drops entries when MaxConcurrent is cleared) to avoid stale stats and potential nil-deref panics in downstream scoring.

Copilot uses AI. Check for mistakes.
"channels.dialogs.rateLimit.fields.queueTimeoutMs.placeholder": "毫秒,留空表示不超时",
"channels.dialogs.rateLimit.fields.queueTimeoutMs.description": "请求在队列中可等待的最长时间(毫秒),超时后以 HTTP 429 拒绝。仅在设置队列长度时生效。",
"channels.dialogs.rateLimit.errors.queueRequiresMaxConcurrent": "设置排队队列长度时,必须同时设置最大并发数(大于 0)。",
"channels.dialogs.rateLimit.hints.softModeWarning": "未设置队列长度时,最大并发数仅在负载均衡评分中将此渠道降权,**不会真正阻塞超出的请求**。如需硬性限制,请同时设置队列长度(例如 1)和队列等待超时。",

Copilot AI Apr 26, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The zh-CN string channels.dialogs.rateLimit.hints.softModeWarning contains markdown-style **...**, but it’s rendered as plain text in the dialog (inside a <span>). This will likely show the asterisks literally. Consider removing the markdown markers or switching to a rendering path that supports emphasis consistently across locales.

Suggested change
"channels.dialogs.rateLimit.hints.softModeWarning": "未设置队列长度时,最大并发数仅在负载均衡评分中将此渠道降权,**不会真正阻塞超出的请求**。如需硬性限制,请同时设置队列长度(例如 1)和队列等待超时。",
"channels.dialogs.rateLimit.hints.softModeWarning": "未设置队列长度时,最大并发数仅在负载均衡评分中将此渠道降权,不会真正阻塞超出的请求。如需硬性限制,请同时设置队列长度(例如 1)和队列等待超时。",

Copilot uses AI. Check for mistakes.
Comment on lines +18 to +20
// ChannelQueueError represents a channel-level admission failure raised by
// ChannelLimiter inside the connectionTracking middleware.
//

Copilot AI Apr 26, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring mismatch: ChannelQueueError is described as raised by ChannelLimiter inside the connectionTracking middleware, but that middleware was replaced/renamed (this code now wraps errors produced by the channel-limiter middleware). Updating the comment will help future readers find the right code path.

Copilot uses AI. Check for mistakes.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request replaces the existing connection tracking with a robust per-channel concurrency admission control system. It introduces a ChannelLimiter that supports both soft-mode scoring and hard-mode FIFO queuing with timeouts, complemented by frontend updates for real-time monitoring and configuration. The review highlights critical improvements needed to prevent memory leaks when disabling limits, avoid nil pointer dereferences in the rate-limit strategy, and correct a scoring inversion logic error to maintain consistent channel ranking under load.

Comment on lines +72 to +82
if cfg.capacity == 0 {
return nil
}

m.mu.RLock()
if e, ok := m.entries[ch.ID]; ok && e.cfg == cfg {
m.mu.RUnlock()

return e.limiter
}
m.mu.RUnlock()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The GetOrCreate method does not remove existing limiter entries when a channel's configuration is updated to disable concurrency limits (i.e., when MaxConcurrent is set to 0 or nil). This leads to a memory leak where orphaned ChannelLimiter instances remain in the entries map indefinitely. Furthermore, these orphaned limiters will continue to be reported in metrics and GraphQL queries, providing stale data.

Additionally, this lack of cleanup combined with the logic in RateLimitAwareStrategy.scoreConcurrency can lead to a nil pointer dereference panic if the channel's rate limit settings are removed entirely.

Suggested change
if cfg.capacity == 0 {
return nil
}
m.mu.RLock()
if e, ok := m.entries[ch.ID]; ok && e.cfg == cfg {
m.mu.RUnlock()
return e.limiter
}
m.mu.RUnlock()
if cfg.capacity == 0 {
m.mu.RLock()
_, ok := m.entries[ch.ID]
m.mu.RUnlock()
if ok {
m.Forget(ch.ID)
}
return nil
}
m.mu.RLock()
if e, ok := m.entries[ch.ID]; ok && e.cfg == cfg {
m.mu.RUnlock()
return e.limiter
}
m.mu.RUnlock()

Comment on lines +215 to +216
rl := channel.Settings.RateLimit
capacity := int(*rl.MaxConcurrent)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This code is susceptible to a nil pointer dereference. If a channel was previously configured with a concurrency limit (so an entry exists in the limiterManager) but is subsequently updated to remove the limit, channel.Settings.RateLimit or rl.MaxConcurrent could be nil. Since limiterManager.Stats might still return ok=true for a short period (or indefinitely if the cleanup issue in GetOrCreate is not addressed), accessing these pointers directly will cause a panic.

	rl := channel.Settings.RateLimit
	if rl == nil || rl.MaxConcurrent == nil || *rl.MaxConcurrent <= 0 {
		return s.maxScore, false
	}
	capacity := int(*rl.MaxConcurrent)

Comment on lines +239 to +257
if inFlight >= capacity {
waitingRatio := float64(waiting) / float64(queueSize)

// Check RPM
if rl.RPM != nil && *rl.RPM > 0 {
rpm := s.requestTracker.GetRequestCount(channel.ID)
details["rpm_limit"] = *rl.RPM
details["rpm_current"] = rpm

if rpm >= *rl.RPM {
exhausted = true
details["rpm_exhausted"] = true
} else {
ratio := float64(rpm) / float64(*rl.RPM)
if ratio > maxRatio {
maxRatio = ratio
if details != nil {
details["concurrency_mode"] = "hard_queueing"
details["concurrency_waiting_ratio"] = waitingRatio
}

return scaleScore(s.maxScore*queueingScoreCeiling, 1-waitingRatio), false
}
}

// Check TPM
if rl.TPM != nil && *rl.TPM > 0 {
tpm := s.requestTracker.GetTokenCount(channel.ID)
details["tpm_limit"] = *rl.TPM
details["tpm_current"] = tpm
ratio := float64(inFlight) / float64(capacity)

if tpm >= *rl.TPM {
exhausted = true
details["tpm_exhausted"] = true
} else {
ratio := float64(tpm) / float64(*rl.TPM)
if ratio > maxRatio {
maxRatio = ratio
}
if details != nil {
details["concurrency_mode"] = "hard_below_capacity"
details["concurrency_inflight_ratio"] = ratio
}
}

// Check concurrent requests using explicit MaxConcurrent first, then default tracker fallback.
if s.connectionTracker != nil {
if concurrencyLimit, source, configured := s.resolveConcurrencyLimit(channel); concurrencyLimit > 0 {
concurrent := s.connectionTracker.GetActiveConnections(channel.ID)
details["concurrent_limit"] = concurrencyLimit
details["concurrent_current"] = concurrent
details["concurrency_limit_source"] = source
details["concurrent_limit_configured"] = configured

if int64(concurrent) >= concurrencyLimit {
exhausted = true
details["concurrent_exhausted"] = true
} else {
ratio := float64(concurrent) / float64(concurrencyLimit)
if ratio > maxRatio {
maxRatio = ratio
}
}
}
return scaleScore(s.maxScore, 1-ratio), false

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a score inversion logic issue in hard mode. When inFlight < capacity, the score is calculated as s.maxScore * (1 - ratio), which approaches 0 as the channel nears capacity. However, as soon as the channel reaches capacity and starts queueing (inFlight >= capacity), the score jumps to s.maxScore * queueingScoreCeiling * (1 - waitingRatio). With the default queueingScoreCeiling of 0.3, a channel that is 90% busy (score 10) will be ranked lower than a channel that is already at capacity but has an empty queue (score 30).

To ensure monotonic scoring, the non-queueing score range should be shifted to stay above the queueing score range.

		if inFlight >= capacity {
			waitingRatio := float64(waiting) / float64(queueSize)

			if details != nil {
				details["concurrency_mode"] = "hard_queueing"
				details["concurrency_waiting_ratio"] = waitingRatio
			}

			return scaleScore(s.maxScore*queueingScoreCeiling, 1-waitingRatio), false
		}

		ratio := float64(inFlight) / float64(capacity)

		if details != nil {
			details["concurrency_mode"] = "hard_below_capacity"
			details["concurrency_inflight_ratio"] = ratio
		}

		minQueueScore := s.maxScore * queueingScoreCeiling
		return minQueueScore + scaleScore(s.maxScore-minQueueScore, 1-ratio), false

Resolves conflict in internal/server/orchestrator/orchestrator.go:
- Drop the legacy withConnectionTracking line (this branch already removed
  the ConnectionTracker entirely; ChannelLimiterManager is now the single
  source of truth for in-flight stats).
- Keep upstream's new captureRawProviderResponse / captureRawProviderStream
  middlewares at the end of the outbound list so the raw provider response
  / stream is captured for pass-through, in addition to keeping the
  withChannelLimiter -> withRateLimitTracking ordering this branch
  introduced (so local queue rejections don't consume RPM budget).

Other auto-merged files: locales, internal/objects/channel.go,
internal/server/orchestrator/outbound.go.

Note: 3 pre-existing race-detector failures in upstream's
pass_through_test.go (TestCaptureRawProviderStream_PropagatesError,
TestPassThroughStream_LLMMiddlewareRuns, TestPassThroughStream_ErrorPropagates)
are introduced by upstream PR #1495 and are unrelated to this merge.

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

… hard-mode score

Address PR #1503 review findings from copilot/gemini-code-assist:

- ChannelLimiterManager.GetOrCreate now drops the cached entry when
  capacity transitions to 0, so Stats/Snapshot stop reporting a channel
  that no longer has a concurrency limit and downstream code can no
  longer dereference now-nil rate-limit pointers.
- RateLimitAwareStrategy.scoreConcurrency adds a defensive nil check on
  channel.Settings.RateLimit / MaxConcurrent so a brief stale Stats hit
  before the next GetOrCreate cannot panic.
- Hard-mode below-capacity score is shifted to [queueCeiling, maxScore]
  so a 9/10 channel always outranks a 10/10 channel with an empty queue
  (previously 10 vs 30 — load balancer preferred the saturated channel).
- zh-CN softModeWarning drops literal **markers** that the dialog renders
  as plain text.
- channel_queue_error.go docstring refreshed (connectionTracking middleware
  was renamed to channelLimiter).

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

The channel concurrency limiter middleware held a struct-scoped sync.Once for
release. Pipeline.Process re-enters OnOutboundRawRequest on same-channel retry
and channel switch, calling Acquire on the same middleware instance again --
but the struct-scoped Once short-circuits every release after the first,
permanently leaking the slot. Under sustained upstream 429s with retry
enabled, MaxConcurrent slots leak one-by-one until the limiter is fully
exhausted and every new request times out in the queue.

Replace acquired+releaseOnce with atomic.Pointer[limiterSlot] where each
Acquire mints a fresh slot owning its own Once. Stream wrappers capture the
slot pointer at wrap time so they release the slot they were minted with,
not whatever's current at Close time.

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@looplj

looplj commented Apr 27, 2026

Copy link
Copy Markdown
Owner

感谢 PR,变更有点大,需要帮忙确认下

  1. 是否只影响配置了 rate limit 的渠道,没有的话,就完全不影响
  2. 本地是否有真实验证过

@suixinio

Copy link
Copy Markdown
Contributor Author

谢谢PR,变更有点大,需要帮忙确认下

  1. 是否只影响配置了速率限制的渠道,没有的话,就完全不影响
  2. 本地是否有真实验证过
  1. 我确认下,
  2. 从昨天开始部署跑了,目前没有发现问题。

@suixinio

Copy link
Copy Markdown
Contributor Author

@looplj

问题 1:是否只影响配置了 rate limit 的渠道?

功能层面:是

没配 MaxConcurrent 的渠道:

  • ChannelLimiterManager.GetOrCreate 返回 nil,admission 中间件直接放行 → 与 PR 前等价
  • 前端 ChannelLimiterCell 仅在 liveLimiterStats != null 时渲染,列表 UI 无视觉变化
  • LB 评分中并发子分走「未启用 limiter」分支 → score_concurrent = maxScore,最终评分仅由 RPM/TPM/其他既有策略决定

代码影响层面:横切改动较大,但已在所有受影响位置同步更新

维度 改动点
DI / 构造函数 7 个 API handler(aisdk / anthropic / doubao / gemini / jina / openai / playground)的 Params 都加了 ChannelLimiterManagerfx.ModuleProvide(NewChannelLimiterManager)
中间件链 withChannelLimiter 替换 withConnectionTracking顺序约束:必须早于 withRateLimitTracking(否则本地拒绝会扣 RPM 配额,已加专项回归测试 Process_QueueRejectionDoesNotConsumeRPM 守住)
错误分类 新增 ChannelQueueError(synthetic 429 + JSON body,无 Retry-After,避免 cooldown 中间件误判)
重试 / 熔断 / cooldown outbound.CanRetrymodel_circuit_breakerrate_limit_tracking 三处都加了 isChannelQueueError 早返分支,确保本地拒绝不会污染上述既有逻辑
删除项 ConnectionTracker 接口 + DefaultConnectionTracker + ConnectionAwareStrategy + lb_strategy_bp.go 整段,单数据源由 ChannelLimiterManager 接管
GraphQL schema ChannelRateLimitqueueSize / queueTimeoutMs;新增 Channel.liveLimiterStats: ChannelLimiterStats(forceResolver)+ 配套 gqlgen 生成代码
Channel CRUD Create/Update 走 ValidateRateLimitUpdate 故意不调 forgetLimiter(靠 cfg-equality 重建,避免孤立 in-flight 槽位);Delete / BulkDelete 仍调 forget
前端 列表查询多取 liveLimiterStats 字段并加 refetchInterval: 5000;rate-limit 对话框新增 Queue Size / Queue Wait Timeout 输入与跨字段校验
可观测性 5 个 OTel 指标:axonhub_channel_inflight_queue_waiting_queue_full_total_queue_timeout_total_queue_wait_seconds

跑了一组 8 项的端到端 + 单测回归(用自写的 mock OpenAI upstream + 当前 HEAD 31b53ebf 编译的 axonhub 二进制),全部 PASS

# 场景 结果 关键证据
P1 多渠道 LB,两个 channel 都不配 rate limit ✅ 30/30 OK 两个 mock 都被命中(20/10),证明未配 rate limit 的路径未受影响
P2 上游真实 429 + Retry-After 触发 cooldown 客户端两次 429;axonhub log 多次输出 channel cooling down due to 429,cooldown 既有路径未坏
P3 同渠道重试可恢复错误(首次 500,第二次 200) 客户端最终 200,mock 命中 2 次,证明 CanRetry 加 queue-error 分支后普通重试仍工作
P4 502 透传不被新代码吞掉 5 次连续请求全部以 502 透传,circuit-breaker 接受真实错误的能力未坏
P5 列表查询性能(50 个 channel + liveLimiterStats + 5 秒轮询) 单次查询 21–22 ms,可接受
P6 新 DI 图启动 axonhub 干净启动,新加的 ChannelLimiterManager 注入 7 个 API handler 全部 OK
P7 MaxConcurrent 热修改(不带 forgetLimiter 的路径) 改前 MaxConcurrent=1, QueueSize=2:4 并发返回 3 OK + 1 queue_full(FIFO+拒绝符合预期);改后 MaxConcurrent=4 软模式:5 并发全 OK
P8 限流相关单测 race + count=20 7.3s 全绿,覆盖软/硬模式、FIFO 公平性、超时、ctx 取消、once 保护、cfg-equality 重建、isChannelQueueError 分类、以及合入后两次 fix commit (5522a16e31b53ebf) 的回归路径

已知限制

P7 第一次跑时未加 sleep 直接失败:5 个新请求中 2 个仍按旧 limiter 容量被拒绝。这是 PR 自述里 §15「rateLimit 配置热修改的短暂松弛」描述的同一现象 —— enabledChannelsCache 的 watcher 通知是异步的,约 1 秒窗口内新请求可能看到旧配置。

不是 PR 引入的新 bug,是其设计上的 known limitation,运维改 rate-limit 配置后稍等几秒再观察即可。

@looplj

looplj commented Apr 27, 2026

Copy link
Copy Markdown
Owner

lint 挂了

Parent tests already call t.Parallel(); subtests must too or the linter
flags the asymmetry.

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@suixinio

Copy link
Copy Markdown
Contributor Author

lint 挂了

fix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature/功能]: 增加同供应商请求排队机制,限制并发数以降低 Rate Limit 错误

3 participants