Skip to content

Commit 3b1a3b2

Browse files
committed
fix(agents): serialize new-session resolution per session key
Concurrent commands for the same session key minted separate sessionIds because resolveSession ran before the per-session execution lane, so a second OpenAI-compatible request that arrived mid-turn forked an isolated, memory-less session. Reserve the brand-new-session path per session key so concurrent commands adopt one sessionId. Fixes #84575
1 parent 4925f84 commit 3b1a3b2

3 files changed

Lines changed: 181 additions & 2 deletions

File tree

src/agents/agent-command.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ import {
7070
resolveInternalEventTranscriptBody,
7171
} from "./command/attempt-execution.shared.js";
7272
import { resolveAgentRunContext } from "./command/run-context.js";
73-
import { resolveSession } from "./command/session.js";
73+
import { resolveSessionWithReservation } from "./command/session-resolution-reservation.js";
7474
import type { AgentCommandIngressOpts, AgentCommandOpts } from "./command/types.js";
7575
import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "./defaults.js";
7676
import { classifyEmbeddedAgentRunResultForModelFallback } from "./embedded-agent-runner/result-fallback-classifier.js";
@@ -449,13 +449,14 @@ async function prepareAgentCommandExecution(opts: AgentCommandOpts, runtime: Run
449449
overrideSeconds: timeoutSecondsRaw,
450450
});
451451

452-
const sessionResolution = resolveSession({
452+
const sessionResolution = await resolveSessionWithReservation({
453453
cfg,
454454
to: opts.to,
455455
sessionId: opts.sessionId,
456456
sessionKey: explicitSessionKey,
457457
agentId: agentIdOverride,
458458
clone: false,
459+
suppressVisibleSessionEffects: opts.sessionEffects === "internal",
459460
});
460461

461462
const { sessionId, sessionKey, storePath, isNewSession, persistedThinking, persistedVerbose } =
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import fs from "node:fs/promises";
2+
import os from "node:os";
3+
import path from "node:path";
4+
import { afterEach, describe, expect, it } from "vitest";
5+
import { clearSessionStoreCaches } from "../../config/sessions/store-cache.js";
6+
import { loadSessionStore } from "../../config/sessions/store.js";
7+
import type { OpenClawConfig } from "../../config/types.openclaw.js";
8+
import { resolveSessionWithReservation } from "./session-resolution-reservation.js";
9+
import { resolveSession } from "./session.js";
10+
11+
async function withTempStore<T>(
12+
run: (params: { cfg: OpenClawConfig; storePath: string }) => Promise<T>,
13+
): Promise<T> {
14+
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-reservation-"));
15+
const storePath = path.join(dir, "sessions.json");
16+
const cfg = { session: { store: storePath, mainKey: "main" } } as OpenClawConfig;
17+
try {
18+
return await run({ cfg, storePath });
19+
} finally {
20+
clearSessionStoreCaches();
21+
await fs.rm(dir, { recursive: true, force: true });
22+
}
23+
}
24+
25+
afterEach(() => {
26+
clearSessionStoreCaches();
27+
});
28+
29+
describe("resolveSessionWithReservation", () => {
30+
const sessionKey = "agent:main:finn:c1";
31+
32+
it("forks distinct sessionIds without the reservation lock (regression repro)", async () => {
33+
await withTempStore(async ({ cfg }) => {
34+
const first = resolveSession({ cfg, sessionKey });
35+
const second = resolveSession({ cfg, sessionKey });
36+
expect(first.isNewSession).toBe(true);
37+
expect(second.isNewSession).toBe(true);
38+
// Without serialization both requests mint their own id, so the second
39+
// request runs in an isolated, memory-less session.
40+
expect(first.sessionId).not.toBe(second.sessionId);
41+
});
42+
});
43+
44+
it("gives concurrent same-key requests one shared sessionId", async () => {
45+
await withTempStore(async ({ cfg, storePath }) => {
46+
const [first, second] = await Promise.all([
47+
resolveSessionWithReservation({ cfg, sessionKey }),
48+
resolveSessionWithReservation({ cfg, sessionKey }),
49+
]);
50+
expect(first.sessionId).toBe(second.sessionId);
51+
// Exactly one resolution created the session; the other adopted it.
52+
expect([first.isNewSession, second.isNewSession].filter(Boolean)).toHaveLength(1);
53+
// The reserved mapping is persisted so later turns resume the same session.
54+
const persisted = loadSessionStore(storePath, { skipCache: true })[sessionKey];
55+
expect(persisted?.sessionId).toBe(first.sessionId);
56+
});
57+
});
58+
59+
it("reuses the reserved id for a follow-up after the first request", async () => {
60+
await withTempStore(async ({ cfg }) => {
61+
const first = await resolveSessionWithReservation({ cfg, sessionKey });
62+
const second = await resolveSessionWithReservation({ cfg, sessionKey });
63+
expect(second.sessionId).toBe(first.sessionId);
64+
expect(second.isNewSession).toBe(false);
65+
});
66+
});
67+
68+
it("leaves the explicit sessionId path unchanged", async () => {
69+
await withTempStore(async ({ cfg }) => {
70+
const resolution = await resolveSessionWithReservation({
71+
cfg,
72+
sessionId: "explicit-123",
73+
});
74+
expect(resolution.sessionId).toBe("explicit-123");
75+
});
76+
});
77+
78+
it("does not write a visible store row for internal handoffs (suppressVisibleSessionEffects)", async () => {
79+
await withTempStore(async ({ cfg, storePath }) => {
80+
const resolution = await resolveSessionWithReservation({
81+
cfg,
82+
sessionKey,
83+
suppressVisibleSessionEffects: true,
84+
});
85+
expect(resolution.isNewSession).toBe(true);
86+
expect(resolution.sessionId).toBeTruthy();
87+
// Internal handoffs must not leak a visible session-store row.
88+
const persisted = loadSessionStore(storePath, { skipCache: true })[sessionKey];
89+
expect(persisted).toBeUndefined();
90+
});
91+
});
92+
});
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import type { OpenClawConfig } from "../../config/types.openclaw.js";
2+
import { persistSessionEntry } from "./attempt-execution.shared.js";
3+
import { resolveSession, type SessionResolution } from "./session.js";
4+
5+
// Per-session-key serialization for resolving a brand-new session identity.
6+
//
7+
// Concurrent commands that target the same session key must agree on a single
8+
// `sessionId`. `resolveSession` mints a fresh `crypto.randomUUID()` whenever the
9+
// key has no fresh stored entry, and that resolution runs before the per-session
10+
// execution lane is entered (the lane is keyed by session key but only
11+
// serializes execution). Two requests that race here -- e.g. a follow-up sent
12+
// while the first turn is still in flight on the OpenAI-compatible endpoint --
13+
// each mint their own id, so the later one runs in an isolated, memory-less
14+
// session. Serializing the mint-and-reserve step closes that window: the first
15+
// request persists the key-to-id mapping and any concurrent request adopts it
16+
// instead of forking a second session.
17+
const SESSION_RESERVATION_QUEUE = new Map<string, Promise<unknown>>();
18+
19+
async function serializeReservation<T>(key: string, task: () => Promise<T>): Promise<T> {
20+
const prev = SESSION_RESERVATION_QUEUE.get(key) ?? Promise.resolve();
21+
const next = prev.then(task, task);
22+
SESSION_RESERVATION_QUEUE.set(key, next);
23+
try {
24+
return await next;
25+
} finally {
26+
if (SESSION_RESERVATION_QUEUE.get(key) === next) {
27+
SESSION_RESERVATION_QUEUE.delete(key);
28+
}
29+
}
30+
}
31+
32+
export type ResolveSessionInput = {
33+
cfg: OpenClawConfig;
34+
to?: string;
35+
sessionId?: string;
36+
sessionKey?: string;
37+
agentId?: string;
38+
clone?: boolean;
39+
/**
40+
* Internal handoffs (sessionEffects === "internal") must not write visible
41+
* session-store rows. Skip the reservation persist entirely; internal runs
42+
* are orchestrated, not user-fired, so there is no same-key race to protect.
43+
*/
44+
suppressVisibleSessionEffects?: boolean;
45+
};
46+
47+
/**
48+
* Resolve a session like {@link resolveSession}, but serialize the brand-new
49+
* session path per session key so concurrent commands cannot fork separate
50+
* sessions for the same key.
51+
*/
52+
export async function resolveSessionWithReservation(
53+
opts: ResolveSessionInput,
54+
): Promise<SessionResolution> {
55+
const resolution = resolveSession(opts);
56+
// Only a brand-new session mints a random id and is therefore racy. An
57+
// explicit session id or an existing fresh entry already pins the identity,
58+
// so those paths skip the lock entirely and keep steady-state turns lock-free.
59+
// Internal handoffs also skip reservation so they cannot create a visible
60+
// session-store row that the suppressVisibleSessionEffects contract forbids.
61+
if (
62+
opts.suppressVisibleSessionEffects ||
63+
opts.sessionId?.trim() ||
64+
!resolution.isNewSession ||
65+
!resolution.sessionKey
66+
) {
67+
return resolution;
68+
}
69+
const reservationKey = `${resolution.storePath}::${resolution.sessionKey}`;
70+
return await serializeReservation(reservationKey, async () => {
71+
// Re-resolve inside the critical section: a concurrent command may have
72+
// reserved an id between the initial resolve and acquiring the lock.
73+
const rechecked = resolveSession(opts);
74+
if (!rechecked.isNewSession || !rechecked.sessionKey || !rechecked.sessionStore) {
75+
return rechecked;
76+
}
77+
const now = Date.now();
78+
await persistSessionEntry({
79+
sessionStore: rechecked.sessionStore,
80+
sessionKey: rechecked.sessionKey,
81+
storePath: rechecked.storePath,
82+
entry: { sessionId: rechecked.sessionId, updatedAt: now, sessionStartedAt: now },
83+
});
84+
return rechecked;
85+
});
86+
}

0 commit comments

Comments
 (0)