Skip to content

Commit 2efc9d6

Browse files
fix: manage SDK stream fiber lifecycle and remove identity helper
- Replace detached Effect.runFork with Effect.forkChild to keep the stream fiber in the managed Effect runtime instead of creating a completely independent root-level fiber. - Store the fiber reference in ClaudeSessionContext and interrupt it in stopSessionInternal before queue/query teardown, eliminating the race window where the fiber could offer events to a shut-down queue. - Remove the no-op asCanonicalTurnId identity function and inline the TurnId value at all call sites. Co-authored-by: Julius Marminge <juliusmarminge@users.noreply.github.com>
1 parent 2b53034 commit 2efc9d6

File tree

1 file changed

+19
-16
lines changed

1 file changed

+19
-16
lines changed

apps/server/src/provider/Layers/ClaudeCodeAdapter.ts

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import {
3434
ThreadId,
3535
TurnId,
3636
} from "@t3tools/contracts";
37-
import { Cause, DateTime, Deferred, Effect, Layer, Queue, Random, Ref, Stream } from "effect";
37+
import { Cause, DateTime, Deferred, Effect, Fiber, Layer, Queue, Random, Ref, Stream } from "effect";
3838

3939
import {
4040
ProviderAdapterProcessError,
@@ -106,6 +106,7 @@ interface ClaudeSessionContext {
106106
lastAssistantUuid: string | undefined;
107107
lastThreadStartedId: string | undefined;
108108
stopped: boolean;
109+
streamFiber: Fiber.Fiber<void, never> | undefined;
109110
}
110111

111112
interface ClaudeQueryRuntime extends AsyncIterable<SDKMessage> {
@@ -144,10 +145,6 @@ function asRuntimeItemId(value: string): RuntimeItemId {
144145
return RuntimeItemId.makeUnsafe(value);
145146
}
146147

147-
function asCanonicalTurnId(value: TurnId): TurnId {
148-
return value;
149-
}
150-
151148
function asRuntimeRequestId(value: ApprovalRequestId): RuntimeRequestId {
152149
return RuntimeRequestId.makeUnsafe(value);
153150
}
@@ -505,7 +502,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
505502
...(typeof message.session_id === "string"
506503
? { providerThreadId: message.session_id }
507504
: {}),
508-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
505+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
509506
...(itemId ? { itemId: ProviderItemId.makeUnsafe(itemId) } : {}),
510507
payload: message,
511508
},
@@ -613,7 +610,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
613610
provider: PROVIDER,
614611
createdAt: stamp.createdAt,
615612
threadId: context.session.threadId,
616-
...(turnState ? { turnId: asCanonicalTurnId(turnState.turnId) } : {}),
613+
...(turnState ? { turnId: turnState.turnId } : {}),
617614
payload: {
618615
message,
619616
class: "provider_error",
@@ -640,7 +637,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
640637
provider: PROVIDER,
641638
createdAt: stamp.createdAt,
642639
threadId: context.session.threadId,
643-
...(turnState ? { turnId: asCanonicalTurnId(turnState.turnId) } : {}),
640+
...(turnState ? { turnId: turnState.turnId } : {}),
644641
payload: {
645642
message,
646643
...(detail !== undefined ? { detail } : {}),
@@ -855,7 +852,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
855852
provider: PROVIDER,
856853
createdAt: stamp.createdAt,
857854
threadId: context.session.threadId,
858-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
855+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
859856
itemId: asRuntimeItemId(tool.itemId),
860857
payload: {
861858
itemType: tool.itemType,
@@ -896,7 +893,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
896893
provider: PROVIDER,
897894
createdAt: stamp.createdAt,
898895
threadId: context.session.threadId,
899-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
896+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
900897
itemId: asRuntimeItemId(tool.itemId),
901898
payload: {
902899
itemType: tool.itemType,
@@ -1006,7 +1003,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
10061003
provider: PROVIDER,
10071004
createdAt: stamp.createdAt,
10081005
threadId: context.session.threadId,
1009-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
1006+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
10101007
providerRefs: {
10111008
...providerThreadRef(context),
10121009
...(context.turnState ? { providerTurnId: context.turnState.turnId } : {}),
@@ -1165,7 +1162,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
11651162
provider: PROVIDER,
11661163
createdAt: stamp.createdAt,
11671164
threadId: context.session.threadId,
1168-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
1165+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
11691166
providerRefs: {
11701167
...providerThreadRef(context),
11711168
...(context.turnState ? { providerTurnId: context.turnState.turnId } : {}),
@@ -1295,6 +1292,11 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
12951292

12961293
context.stopped = true;
12971294

1295+
if (context.streamFiber) {
1296+
yield* Fiber.interrupt(context.streamFiber);
1297+
context.streamFiber = undefined;
1298+
}
1299+
12981300
for (const [requestId, pending] of context.pendingApprovals) {
12991301
yield* Deferred.succeed(pending.decision, "cancel");
13001302
const stamp = yield* makeEventStamp();
@@ -1304,7 +1306,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
13041306
provider: PROVIDER,
13051307
createdAt: stamp.createdAt,
13061308
threadId: context.session.threadId,
1307-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
1309+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
13081310
requestId: asRuntimeRequestId(requestId),
13091311
payload: {
13101312
requestType: pending.requestType,
@@ -1442,7 +1444,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
14421444
provider: PROVIDER,
14431445
createdAt: requestedStamp.createdAt,
14441446
threadId: context.session.threadId,
1445-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
1447+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
14461448
requestId: asRuntimeRequestId(requestId),
14471449
payload: {
14481450
requestType,
@@ -1494,7 +1496,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
14941496
provider: PROVIDER,
14951497
createdAt: resolvedStamp.createdAt,
14961498
threadId: context.session.threadId,
1497-
...(context.turnState ? { turnId: asCanonicalTurnId(context.turnState.turnId) } : {}),
1499+
...(context.turnState ? { turnId: context.turnState.turnId } : {}),
14981500
requestId: asRuntimeRequestId(requestId),
14991501
payload: {
15001502
requestType,
@@ -1610,6 +1612,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
16101612
lastAssistantUuid: resumeState?.resumeSessionAt,
16111613
lastThreadStartedId: undefined,
16121614
stopped: false,
1615+
streamFiber: undefined,
16131616
};
16141617
yield* Ref.set(contextRef, context);
16151618
sessions.set(threadId, context);
@@ -1658,7 +1661,7 @@ function makeClaudeCodeAdapter(options?: ClaudeCodeAdapterLiveOptions) {
16581661
providerRefs: {},
16591662
});
16601663

1661-
Effect.runFork(runSdkStream(context));
1664+
context.streamFiber = yield* Effect.forkChild(runSdkStream(context));
16621665

16631666
return {
16641667
...session,

0 commit comments

Comments
 (0)