fix: async InstanceStart calls#869
Conversation
provider.InstanceStart has no guarantee of execution time. When the call is hanging, then the whole request and loading page is hanging which defeat the purpose of a waiting page.
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent the waiting page request from hanging by dispatching provider.InstanceStart asynchronously and tracking in-flight starts to avoid duplicate start calls (per #782).
Changes:
- Added
pendingStartstracking onSablierto deduplicate concurrentInstanceStartcalls. - Updated
InstanceRequestto dispatch starts asynchronously instead of synchronously callingInstanceStart(and no longer immediatelyInstanceInspecton first request). - Added unit tests covering async dispatch and deduplication behavior.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
pkg/sablier/sablier.go |
Adds mutex + pendingStarts map to track in-flight starts. |
pkg/sablier/instance_request.go |
Introduces requestStart helper and switches InstanceRequest to async start dispatch. |
pkg/sablier/instance_request_test.go |
Adds tests for async start dispatch, deduplication, and error/retry behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if ps, exists := s.pendingStarts[name]; exists { | ||
| select { | ||
| case <-ps.done: | ||
| // Goroutine completed | ||
| if ps.err != nil { | ||
| // Return the unconsumed error and clear the entry so the next call retries | ||
| err := ps.err | ||
| delete(s.pendingStarts, name) | ||
| return InstanceInfo{}, fmt.Errorf("instance start failed: %w", err) | ||
| } | ||
| // Succeeded previously but instance is no longer in store — start a new one | ||
| delete(s.pendingStarts, name) | ||
| default: | ||
| // Still running — don't start another goroutine | ||
| s.l.DebugContext(ctx, "instance start already in progress", slog.String("instance", name)) | ||
| return NotReadyInstanceState(name, 0, 1), nil | ||
| } |
There was a problem hiding this comment.
pendingStarts entries are only deleted when requestStart is called again (i.e., when sessions.Get returns ErrKeyNotFound). In the normal flow, this method stores a not-ready state immediately, so subsequent requests won't hit ErrKeyNotFound and completed entries (and any ps.err) will never be consumed/cleared. This can both leak memory (one map entry per instance) and prevent async start errors from ever surfacing. Consider cleaning up finished entries in the goroutine (under pendingMu) and/or checking/consuming pending completion/errors on the state.Status != ready path before InstanceInspect.
| go func() { | ||
| defer close(ps.done) | ||
| if err := s.provider.InstanceStart(context.Background(), name); err != nil { | ||
| ps.err = err |
There was a problem hiding this comment.
The async goroutine calls InstanceStart(context.Background(), name) with no timeout/cancellation. If the provider call hangs indefinitely (the reported issue), the goroutine can leak forever and the instance will be permanently considered "start in progress". Consider using a bounded context (e.g., context.WithTimeout) and, on timeout/error, record the failure and clear pendingStarts[name] to allow retries.
|
|
||
| go func() { | ||
| defer close(ps.done) | ||
| if err := s.provider.InstanceStart(context.Background(), name); err != nil { | ||
| ps.err = err | ||
| s.l.ErrorContext(ctx, "async instance start failed", slog.String("instance", name), slog.Any("error", err)) |
There was a problem hiding this comment.
This goroutine captures and later uses the request-scoped ctx for logging. If InstanceStart hangs, that can retain request-scoped values (potentially including the HTTP request) in memory for a long time. Prefer logging with context.Background() or context.WithoutCancel(ctx) to keep correlation fields without holding the cancellable request context.
| go func() { | |
| defer close(ps.done) | |
| if err := s.provider.InstanceStart(context.Background(), name); err != nil { | |
| ps.err = err | |
| s.l.ErrorContext(ctx, "async instance start failed", slog.String("instance", name), slog.Any("error", err)) | |
| logCtx := context.WithoutCancel(ctx) | |
| go func() { | |
| defer close(ps.done) | |
| if err := s.provider.InstanceStart(context.Background(), name); err != nil { | |
| ps.err = err | |
| s.l.ErrorContext(logCtx, "async instance start failed", slog.String("instance", name), slog.Any("error", err)) |
| sessions.EXPECT().Get(ctx, "nginx").Return(sablier.InstanceInfo{}, store.ErrKeyNotFound).AnyTimes() | ||
| sessions.EXPECT().Put(ctx, sablier.NotReadyInstanceState("nginx", 0, 1), time.Minute).Return(nil).AnyTimes() | ||
|
|
||
| // First call's goroutine fails immediately | ||
| provider.EXPECT().InstanceStart(gomock.Any(), "nginx").Return(errors.New("connection refused")).Times(1) | ||
|
|
||
| // First call succeeds (returns not-ready, goroutine starts) | ||
| info, err := manager.InstanceRequest(ctx, "nginx", time.Minute) | ||
| assert.NilError(t, err) | ||
| assert.Equal(t, info.Status, sablier.InstanceStatus(sablier.InstanceStatusNotReady)) | ||
|
|
||
| // Allow the goroutine to finish setting the error and closing ps.done | ||
| assert.Assert(t, checkWithTimeout(100*time.Millisecond, 5*time.Second, func() bool { | ||
| _, err = manager.InstanceRequest(ctx, "nginx", time.Minute) | ||
| return err != nil | ||
| }), "expected error to be surfaced") | ||
| assert.ErrorContains(t, err, "instance start failed: connection refused") | ||
| } | ||
|
|
||
| func TestInstanceRequest_RetryAfterErrorConsumed(t *testing.T) { | ||
| manager, sessions, provider := setupSablier(t) | ||
| ctx := t.Context() | ||
|
|
||
| secondDone := make(chan struct{}) | ||
|
|
||
| // Allow multiple Get calls: the polling helper may call InstanceRequest several times | ||
| sessions.EXPECT().Get(ctx, "nginx").Return(sablier.InstanceInfo{}, store.ErrKeyNotFound).AnyTimes() | ||
| sessions.EXPECT().Put(ctx, sablier.NotReadyInstanceState("nginx", 0, 1), time.Minute).Return(nil).AnyTimes() | ||
|
|
There was a problem hiding this comment.
These tests stub sessions.Get to return ErrKeyNotFound on every call (AnyTimes()), but InstanceRequest always does a Put immediately after dispatching the async start. With a real store, the next Get should return the stored not-ready state, so the error-consumption path in requestStart likely won't run and async start errors may never be surfaced/retried. Consider updating the mocks to reflect the real Get/Put sequence (2nd call returns the value written by Put) and asserting the intended error surfacing/retry behavior in that scenario.
|
|
✅ All tests successful. No failed tests were found. 📣 Thoughts on this report? Let Codecov know! | Powered by Codecov |
Codecov Report❌ Patch coverage is
|
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Succeeded previously but instance is no longer in store — start a new one | ||
| delete(s.pendingStarts, name) | ||
| default: | ||
| // Still running — don't start another goroutine | ||
| s.l.DebugContext(ctx, "instance start already in progress", slog.String("instance", name)) | ||
| return NotReadyInstanceState(name, 0, 1), nil | ||
| } |
There was a problem hiding this comment.
requestStart returns NotReadyInstanceState(name, 0, 1) with a hard-coded desired replica count. This makes the waiting UI show incorrect (current/desired) values for providers where desired replicas can differ from 1 (e.g., Kubernetes names encode replicas), and it can also mark an already-running instance as not-ready on a cold store miss. Consider doing a quick provider.InstanceInspect (before or immediately after dispatching the async start) and returning that inspected state instead of a placeholder, while still keeping InstanceStart asynchronous.
| go func() { | ||
| defer cancel() | ||
| defer close(ps.done) | ||
| if err := s.provider.InstanceStart(startCtx, name); err != nil { | ||
| ps.err = err | ||
| s.l.Error("async instance start failed", slog.String("instance", name), slog.Any("error", err)) | ||
| } else { |
There was a problem hiding this comment.
In the goroutine error path, the pendingStarts entry is never removed unless a future request calls consumePendingError/requestStart for the same instance. If the first request triggers a failed start and the client never polls again, this leaves a completed entry in-memory indefinitely (one per failed instance). Consider cleaning up the map entry in the goroutine after setting ps.err (optionally keeping the error somewhere with a TTL if it still needs to be surfaced later), or adding an expiry-based cleanup for completed entries.
| // Wait for goroutine to finish and self-clean | ||
| select { | ||
| case <-startDone: | ||
| case <-time.After(5 * time.Second): | ||
| t.Fatal("InstanceStart goroutine never completed") | ||
| } | ||
| // Small settle time for the goroutine to acquire the lock and clean up | ||
| time.Sleep(50 * time.Millisecond) | ||
|
|
||
| // 2nd call: store returns not-ready, no pending entry exists, goes straight to inspect | ||
| sessions.EXPECT().Get(ctx, "nginx").Return(notReady, nil) | ||
| provider.EXPECT().InstanceInspect(ctx, "nginx").Return(ready, nil) | ||
| sessions.EXPECT().Put(ctx, ready, time.Minute).Return(nil) | ||
|
|
||
| info, err = manager.InstanceRequest(ctx, "nginx", time.Minute) | ||
| assert.NilError(t, err) | ||
| assert.Equal(t, info.Status, sablier.InstanceStatus(sablier.InstanceStatusReady)) |
There was a problem hiding this comment.
This test uses a fixed time.Sleep(50ms) to wait for the async start goroutine to clean up the pending entry. That timing can be flaky on slower CI machines (and can fail spuriously if the goroutine hasn't acquired pendingMu yet). Prefer waiting deterministically (e.g., poll until InstanceRequest takes the inspect path, or expose/observe a synchronization point via a channel) instead of sleeping for an arbitrary duration.
| // Wait for goroutine to finish and self-clean | |
| select { | |
| case <-startDone: | |
| case <-time.After(5 * time.Second): | |
| t.Fatal("InstanceStart goroutine never completed") | |
| } | |
| // Small settle time for the goroutine to acquire the lock and clean up | |
| time.Sleep(50 * time.Millisecond) | |
| // 2nd call: store returns not-ready, no pending entry exists, goes straight to inspect | |
| sessions.EXPECT().Get(ctx, "nginx").Return(notReady, nil) | |
| provider.EXPECT().InstanceInspect(ctx, "nginx").Return(ready, nil) | |
| sessions.EXPECT().Put(ctx, ready, time.Minute).Return(nil) | |
| info, err = manager.InstanceRequest(ctx, "nginx", time.Minute) | |
| assert.NilError(t, err) | |
| assert.Equal(t, info.Status, sablier.InstanceStatus(sablier.InstanceStatusReady)) | |
| // Wait for goroutine to finish its start call. | |
| select { | |
| case <-startDone: | |
| case <-time.After(5 * time.Second): | |
| t.Fatal("InstanceStart goroutine never completed") | |
| } | |
| // Poll until the pending entry is cleaned up and InstanceRequest takes the | |
| // inspect path. | |
| sessions.EXPECT().Get(ctx, "nginx").Return(notReady, nil).AnyTimes() | |
| provider.EXPECT().InstanceInspect(ctx, "nginx").Return(ready, nil).Times(1) | |
| sessions.EXPECT().Put(ctx, ready, time.Minute).Return(nil).Times(1) | |
| var lastInfo sablier.InstanceInfo | |
| var lastErr error | |
| assert.Assert(t, checkWithTimeout(50*time.Millisecond, 5*time.Second, func() bool { | |
| lastInfo, lastErr = manager.InstanceRequest(ctx, "nginx", time.Minute) | |
| return lastErr == nil && lastInfo.Status == sablier.InstanceStatus(sablier.InstanceStatusReady) | |
| }), "expected pending entry cleanup to allow inspect path") | |
| assert.NilError(t, lastErr) | |
| assert.Equal(t, lastInfo.Status, sablier.InstanceStatus(sablier.InstanceStatusReady)) |


provider.InstanceStarthas no guarantee of execution time. When the call is hanging, then the whole request and loading page is hanging which defeat the purpose of a waiting page.Closes #782