Skip to content

Commit ea43361

Browse files
authored
Merge pull request #2121 from destinoantagonista-wq/main
Reconcile registry model states on auth changes
2 parents c1818f1 + e08f68e commit ea43361

2 files changed

Lines changed: 117 additions & 1 deletion

File tree

sdk/cliproxy/auth/conductor.go

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,84 @@ func (m *Manager) RefreshSchedulerEntry(authID string) {
234234
m.scheduler.upsertAuth(snapshot)
235235
}
236236

237+
// ReconcileRegistryModelStates aligns per-model runtime state with the current
238+
// registry snapshot for one auth.
239+
//
240+
// Supported models are reset to a clean state because re-registration already
241+
// cleared the registry-side cooldown/suspension snapshot. ModelStates for
242+
// models that are no longer present in the registry are pruned entirely so
243+
// renamed/removed models cannot keep auth-level status stale.
244+
func (m *Manager) ReconcileRegistryModelStates(ctx context.Context, authID string) {
245+
if m == nil || authID == "" {
246+
return
247+
}
248+
249+
supportedModels := registry.GetGlobalRegistry().GetModelsForClient(authID)
250+
supported := make(map[string]struct{}, len(supportedModels))
251+
for _, model := range supportedModels {
252+
if model == nil {
253+
continue
254+
}
255+
modelKey := canonicalModelKey(model.ID)
256+
if modelKey == "" {
257+
continue
258+
}
259+
supported[modelKey] = struct{}{}
260+
}
261+
262+
var snapshot *Auth
263+
now := time.Now()
264+
265+
m.mu.Lock()
266+
auth, ok := m.auths[authID]
267+
if ok && auth != nil && len(auth.ModelStates) > 0 {
268+
changed := false
269+
for modelKey, state := range auth.ModelStates {
270+
baseModel := canonicalModelKey(modelKey)
271+
if baseModel == "" {
272+
baseModel = strings.TrimSpace(modelKey)
273+
}
274+
if _, supportedModel := supported[baseModel]; !supportedModel {
275+
// Drop state for models that disappeared from the current registry
276+
// snapshot. Keeping them around leaks stale errors into auth-level
277+
// status, management output, and websocket fallback checks.
278+
delete(auth.ModelStates, modelKey)
279+
changed = true
280+
continue
281+
}
282+
if state == nil {
283+
continue
284+
}
285+
if modelStateIsClean(state) {
286+
continue
287+
}
288+
resetModelState(state, now)
289+
changed = true
290+
}
291+
if len(auth.ModelStates) == 0 {
292+
auth.ModelStates = nil
293+
}
294+
if changed {
295+
updateAggregatedAvailability(auth, now)
296+
if !hasModelError(auth, now) {
297+
auth.LastError = nil
298+
auth.StatusMessage = ""
299+
auth.Status = StatusActive
300+
}
301+
auth.UpdatedAt = now
302+
if errPersist := m.persist(ctx, auth); errPersist != nil {
303+
logEntryWithRequestID(ctx).WithField("auth_id", auth.ID).Warnf("failed to persist auth changes during model state reconciliation: %v", errPersist)
304+
}
305+
snapshot = auth.Clone()
306+
}
307+
}
308+
m.mu.Unlock()
309+
310+
if m.scheduler != nil && snapshot != nil {
311+
m.scheduler.upsertAuth(snapshot)
312+
}
313+
}
314+
237315
func (m *Manager) SetSelector(selector Selector) {
238316
if m == nil {
239317
return
@@ -1966,19 +2044,41 @@ func resetModelState(state *ModelState, now time.Time) {
19662044
state.UpdatedAt = now
19672045
}
19682046

2047+
func modelStateIsClean(state *ModelState) bool {
2048+
if state == nil {
2049+
return true
2050+
}
2051+
if state.Status != StatusActive {
2052+
return false
2053+
}
2054+
if state.Unavailable || state.StatusMessage != "" || !state.NextRetryAfter.IsZero() || state.LastError != nil {
2055+
return false
2056+
}
2057+
if state.Quota.Exceeded || state.Quota.Reason != "" || !state.Quota.NextRecoverAt.IsZero() || state.Quota.BackoffLevel != 0 {
2058+
return false
2059+
}
2060+
return true
2061+
}
2062+
19692063
func updateAggregatedAvailability(auth *Auth, now time.Time) {
1970-
if auth == nil || len(auth.ModelStates) == 0 {
2064+
if auth == nil {
2065+
return
2066+
}
2067+
if len(auth.ModelStates) == 0 {
2068+
clearAggregatedAvailability(auth)
19712069
return
19722070
}
19732071
allUnavailable := true
19742072
earliestRetry := time.Time{}
19752073
quotaExceeded := false
19762074
quotaRecover := time.Time{}
19772075
maxBackoffLevel := 0
2076+
hasState := false
19782077
for _, state := range auth.ModelStates {
19792078
if state == nil {
19802079
continue
19812080
}
2081+
hasState = true
19822082
stateUnavailable := false
19832083
if state.Status == StatusDisabled {
19842084
stateUnavailable = true
@@ -2008,6 +2108,10 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
20082108
}
20092109
}
20102110
}
2111+
if !hasState {
2112+
clearAggregatedAvailability(auth)
2113+
return
2114+
}
20112115
auth.Unavailable = allUnavailable
20122116
if allUnavailable {
20132117
auth.NextRetryAfter = earliestRetry
@@ -2027,6 +2131,15 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
20272131
}
20282132
}
20292133

2134+
func clearAggregatedAvailability(auth *Auth) {
2135+
if auth == nil {
2136+
return
2137+
}
2138+
auth.Unavailable = false
2139+
auth.NextRetryAfter = time.Time{}
2140+
auth.Quota = QuotaState{}
2141+
}
2142+
20302143
func hasModelError(auth *Auth, now time.Time) bool {
20312144
if auth == nil || len(auth.ModelStates) == 0 {
20322145
return false

sdk/cliproxy/service.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ func (s *Service) applyCoreAuthAddOrUpdate(ctx context.Context, auth *coreauth.A
312312
// This operation may block on network calls, but the auth configuration
313313
// is already effective at this point.
314314
s.registerModelsForAuth(auth)
315+
s.coreManager.ReconcileRegistryModelStates(ctx, auth.ID)
315316

316317
// Refresh the scheduler entry so that the auth's supportedModelSet is rebuilt
317318
// from the now-populated global model registry. Without this, newly added auths
@@ -1027,6 +1028,7 @@ func (s *Service) refreshModelRegistrationForAuth(current *coreauth.Auth) bool {
10271028
s.ensureExecutorsForAuth(current)
10281029
}
10291030
s.registerModelsForAuth(current)
1031+
s.coreManager.ReconcileRegistryModelStates(context.Background(), current.ID)
10301032

10311033
latest, ok := s.latestAuthForModelRegistration(current.ID)
10321034
if !ok || latest.Disabled {
@@ -1040,6 +1042,7 @@ func (s *Service) refreshModelRegistrationForAuth(current *coreauth.Auth) bool {
10401042
// no auth fields changed, but keeps the refresh path simple and correct.
10411043
s.ensureExecutorsForAuth(latest)
10421044
s.registerModelsForAuth(latest)
1045+
s.coreManager.ReconcileRegistryModelStates(context.Background(), latest.ID)
10431046
s.coreManager.RefreshSchedulerEntry(current.ID)
10441047
return true
10451048
}

0 commit comments

Comments
 (0)