feat: local resilience — retry, circuit breaker, health checks, supervision#17
Conversation
…vision (#11) Add production-grade resilience primitives for local-first operation: - Retry with exponential backoff + jitter in @skytwin/core (withRetry, fetchWithRetry) - Generalized CircuitBreaker class with half-open probes and backoff - Per-user circuit breakers in worker to skip persistently failing users - Retry on transient HTTP errors in Gmail/Calendar connectors and worker→API forwarding - Split health check: /health/live (liveness), /health/ready (readiness with DB check) - Process supervision in bin/skytwin-dev (auto-restart on crash, max 5 restarts) - Docker restart policies (unless-stopped) for cockroachdb and api - Atomic approval execution via withTransaction - Structured logging in worker (createLogger) - 26 new tests for retry and circuit-breaker utilities Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
40ec5cf to
76bf9d7
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces local resilience primitives (retry/backoff, circuit breaking, health checks, and basic process supervision) and wires them into core runtime paths (connectors, worker signal forwarding, and API readiness).
Changes:
- Add shared retry utilities (
withRetry,fetchWithRetry) and a generalizedCircuitBreakerto@skytwin/core, plus unit tests. - Apply retry + circuit breaking in connectors/worker to reduce transient-failure signal loss and skip persistently failing users.
- Add
/api/health/liveand/api/health/ready, docker restart/health checks, and supervised dev process launching.
Reviewed changes
Copilot reviewed 13 out of 15 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| pnpm-lock.yaml | Adds workspace links for @skytwin/core usage. |
| packages/core/src/retry.ts | New retry/backoff utility with Retry-After support. |
| packages/core/src/circuit-breaker.ts | New generalized circuit breaker + helper wrapper. |
| packages/core/src/index.ts | Exports retry/circuit breaker APIs for other packages. |
| packages/core/src/tests/retry.test.ts | Unit tests for retry utilities. |
| packages/core/src/tests/circuit-breaker.test.ts | Unit tests for circuit breaker behavior. |
| packages/connectors/src/gmail-connector.ts | Wraps Gmail API calls with retry/backoff. |
| packages/connectors/src/google-calendar-connector.ts | Wraps Calendar API calls with retry/backoff and sync token reset handling. |
| packages/connectors/package.json | Adds @skytwin/core dependency. |
| apps/worker/src/index.ts | Adds per-user circuit breaker + retry on API ingest; structured logging. |
| apps/worker/package.json | Adds @skytwin/core dependency. |
| apps/api/src/routes/approvals.ts | Attempts to persist execution plan+result atomically via withTransaction. |
| apps/api/src/index.ts | Splits health checks into liveness/readiness endpoints. |
| bin/skytwin-dev | Adds process supervision (auto-restart) for dev services. |
| docker-compose.yml | Adds restart policy + healthcheck for cockroachdb/api. |
Files not reviewed (1)
- pnpm-lock.yaml: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return ( | ||
| msg.includes('fetch failed') || | ||
| msg.includes('network') || | ||
| msg.includes('econnrefused') || | ||
| msg.includes('econnreset') || | ||
| msg.includes('etimedout') || | ||
| msg.includes('dns') || | ||
| error.name === 'AbortError' || | ||
| error.name === 'TypeError' // Node fetch throws TypeError for network failures | ||
| ); |
There was a problem hiding this comment.
isNetworkError() treats any error.name === 'TypeError' as retryable. In Node, many programmer errors are TypeErrors (e.g., calling something that isn’t a function), so this can accidentally retry non-network bugs and mask real failures. Narrow this condition to TypeErrors that match known fetch/network failure messages (or remove the name check and rely on message/known error codes).
| return ( | |
| msg.includes('fetch failed') || | |
| msg.includes('network') || | |
| msg.includes('econnrefused') || | |
| msg.includes('econnreset') || | |
| msg.includes('etimedout') || | |
| msg.includes('dns') || | |
| error.name === 'AbortError' || | |
| error.name === 'TypeError' // Node fetch throws TypeError for network failures | |
| ); | |
| const hasKnownNetworkMessage = | |
| msg.includes('fetch failed') || | |
| msg.includes('network') || | |
| msg.includes('econnrefused') || | |
| msg.includes('econnreset') || | |
| msg.includes('etimedout') || | |
| msg.includes('dns'); | |
| if (error.name === 'AbortError') { | |
| return true; | |
| } | |
| if (error.name === 'TypeError') { | |
| return hasKnownNetworkMessage; | |
| } | |
| return hasKnownNetworkMessage; |
| canExecute(): boolean { | ||
| if (this.state === 'closed') return true; | ||
|
|
||
| if (this.state === 'open') { | ||
| const elapsed = Date.now() - (this.openedAt ?? 0); | ||
| if (elapsed >= this.currentResetTimeout) { | ||
| this.state = 'half_open'; | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| // half_open — allow one probe | ||
| return true; | ||
| } |
There was a problem hiding this comment.
canExecute() returns true for every call in half_open, so multiple concurrent requests can pass as “the probe request” even though the docstring says only one probe is allowed. Track whether a half-open probe is already in-flight (or transition back to open/block additional calls until the probe resolves) to enforce single-probe semantics.
| for (const connector of userConnectors.connectors) { | ||
| try { | ||
| const signals = await connector.poll(); | ||
| for (const signal of signals) { | ||
| await forwardSignalToApi(signal, userConnectors.userId); | ||
| } | ||
| } catch (error) { | ||
| console.error( | ||
| `[worker] Error polling ${connector.name} for user ${userConnectors.userId}:`, | ||
| error instanceof Error ? error.message : error, | ||
| ); | ||
| hadFailure = true; | ||
| log.error(`Error polling ${connector.name} for user ${userConnectors.userId}`, { | ||
| error: error instanceof Error ? error.message : String(error), | ||
| }); |
There was a problem hiding this comment.
pollUser() wraps both connector.poll() and the per-signal forwarding loop in a single try. Since forwardSignalToApi() now throws on non-retryable HTTP statuses, a single failed forward will skip forwarding the remaining signals from that connector. Consider catching/logging errors per-signal (or making forwardSignalToApi swallow non-retryable errors) so one bad ingest doesn’t drop subsequent signals.
| const listResponse = await withRetry(async () => { | ||
| const currentToken = await this.tokenStore.refreshIfExpired(this.userId, 'google'); | ||
| const response = await fetch(listUrl, { | ||
| headers: { Authorization: `Bearer ${currentToken.accessToken}` }, | ||
| }); | ||
|
|
||
| if (!response.ok) { | ||
| if (response.status === 401) { | ||
| // Token expired — refresh will happen on next attempt | ||
| throw new RetryableHttpError(401, 'Gmail token expired', null); | ||
| } | ||
| if ([429, 500, 502, 503].includes(response.status)) { | ||
| const retryAfterMs = parseRetryAfter(response.headers.get('Retry-After')); | ||
| throw new RetryableHttpError(response.status, `Gmail API list failed: ${response.status}`, retryAfterMs); | ||
| } | ||
| return this.processListResponse(retryResponse, refreshed.accessToken); | ||
| throw new Error(`Gmail API list failed: ${response.status}`); | ||
| } | ||
| throw new Error(`Gmail API list failed: ${listResponse.status}`); | ||
| } | ||
|
|
||
| return this.processListResponse(listResponse, token.accessToken); | ||
| return response; | ||
| }, { maxRetries: 3, baseDelayMs: 1000 }); | ||
|
|
||
| const currentToken = await this.tokenStore.refreshIfExpired(this.userId, 'google'); | ||
| return this.processListResponse(listResponse, currentToken.accessToken); |
There was a problem hiding this comment.
After the successful withRetry(...) call, the code calls refreshIfExpired() again to get currentToken just to pass an access token into processListResponse(). This adds an extra token refresh/DB roundtrip on every poll even when the token used for the successful request is still valid. Return both { response, accessToken } from the retried function (or otherwise reuse the token from the successful attempt) to avoid the redundant refresh.
| const planResult = await client.query( | ||
| `INSERT INTO execution_plans (id, decision_id, status, steps, created_at) | ||
| VALUES (gen_random_uuid(), $1, $2, $3, now()) | ||
| RETURNING *`, | ||
| [ | ||
| approval.decision_id, | ||
| result.status === 'completed' ? 'completed' : 'failed', | ||
| JSON.stringify(result.output?.['stepsCompleted'] | ||
| ? [{ type: candidateAction.actionType, status: result.status }] | ||
| : []), | ||
| ], | ||
| ); | ||
| const plan = planResult.rows[0]!; | ||
|
|
||
| await client.query( | ||
| `INSERT INTO execution_results (id, plan_id, success, outputs, error, rollback_available, created_at) | ||
| VALUES (gen_random_uuid(), $1, $2, $3, $4, $5, now())`, | ||
| [ |
There was a problem hiding this comment.
The raw SQL inserts here don’t match the current DB schema/repository behavior: execution_plans includes a required action_id column (see packages/db/src/schemas/schema.sql), but the INSERT omits it; and execution_results uses completed_at (not created_at), so inserting into created_at will fail. Prefer reusing executionRepository inside the transaction (or update the SQL to include the correct columns) to avoid schema drift and runtime insert errors.
Summary
withRetry,fetchWithRetry) and generalizedCircuitBreakerclass to@skytwin/core/health/live(liveness) and/health/ready(readiness with DB check)bin/skytwin-dev(auto-restart on crash, max 5 restarts)restart: unless-stoppedfor cockroachdb and apiwithTransactioncreateLoggerCloses #11
Test plan
pnpm test)pnpm build)bin/skytwin-devrestarts it/api/health/readyreturns 503 when DB is down🤖 Generated with Claude Code