Skip to content

Commit 338d313

Browse files
committed
fix(tasks): scope shared run updates by session
1 parent 8fa5ac5 commit 338d313

9 files changed

Lines changed: 485 additions & 371 deletions

File tree

src/acp/control-plane/manager.core.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,7 @@ export class AcpSessionManager {
790790
}
791791
if (taskContext) {
792792
this.markBackgroundTaskRunning(taskContext.runId, {
793+
sessionKey,
793794
lastEventAt: Date.now(),
794795
progressSummary: taskProgressSummary || null,
795796
});
@@ -835,6 +836,7 @@ export class AcpSessionManager {
835836
if (taskContext) {
836837
const terminalResult = resolveBackgroundTaskTerminalResult(taskProgressSummary);
837838
this.markBackgroundTaskTerminal(taskContext.runId, {
839+
sessionKey,
838840
status: "succeeded",
839841
endedAt: Date.now(),
840842
lastEventAt: Date.now(),
@@ -874,6 +876,7 @@ export class AcpSessionManager {
874876
});
875877
if (taskContext) {
876878
this.markBackgroundTaskTerminal(taskContext.runId, {
879+
sessionKey,
877880
status: resolveBackgroundTaskFailureStatus(acpError),
878881
endedAt: Date.now(),
879882
lastEventAt: Date.now(),
@@ -1884,8 +1887,7 @@ export class AcpSessionManager {
18841887
createRunningTaskRun({
18851888
runtime: "acp",
18861889
sourceId: context.runId,
1887-
ownerKey: context.requesterSessionKey,
1888-
scopeKind: "session",
1890+
requesterSessionKey: context.requesterSessionKey,
18891891
requesterOrigin: context.requesterOrigin,
18901892
childSessionKey: context.childSessionKey,
18911893
runId: context.runId,
@@ -1903,13 +1905,16 @@ export class AcpSessionManager {
19031905
private markBackgroundTaskRunning(
19041906
runId: string,
19051907
params: {
1908+
sessionKey?: string;
19061909
lastEventAt?: number;
19071910
progressSummary?: string | null;
19081911
},
19091912
): void {
19101913
try {
19111914
startTaskRunByRunId({
19121915
runId,
1916+
runtime: "acp",
1917+
sessionKey: params.sessionKey,
19131918
lastEventAt: params.lastEventAt,
19141919
progressSummary: params.progressSummary,
19151920
});
@@ -1921,6 +1926,7 @@ export class AcpSessionManager {
19211926
private markBackgroundTaskTerminal(
19221927
runId: string,
19231928
params: {
1929+
sessionKey?: string;
19241930
status: "succeeded" | "failed" | "timed_out";
19251931
endedAt: number;
19261932
lastEventAt?: number;
@@ -1934,6 +1940,8 @@ export class AcpSessionManager {
19341940
if (params.status === "succeeded") {
19351941
completeTaskRunByRunId({
19361942
runId,
1943+
runtime: "acp",
1944+
sessionKey: params.sessionKey,
19371945
endedAt: params.endedAt,
19381946
lastEventAt: params.lastEventAt,
19391947
progressSummary: params.progressSummary,
@@ -1944,6 +1952,8 @@ export class AcpSessionManager {
19441952
}
19451953
failTaskRunByRunId({
19461954
runId,
1955+
runtime: "acp",
1956+
sessionKey: params.sessionKey,
19471957
status: params.status,
19481958
endedAt: params.endedAt,
19491959
lastEventAt: params.lastEventAt,

src/agents/acp-spawn-parent-stream.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ export function startAcpSpawnParentStreamRelay(params: {
206206
const emitStartNotice = () => {
207207
recordTaskRunProgressByRunId({
208208
runId,
209+
runtime: "acp",
210+
sessionKey: params.childSessionKey,
209211
lastEventAt: Date.now(),
210212
eventSummary: "Started.",
211213
});
@@ -273,6 +275,8 @@ export function startAcpSpawnParentStreamRelay(params: {
273275
stallNotified = true;
274276
recordTaskRunProgressByRunId({
275277
runId,
278+
runtime: "acp",
279+
sessionKey: params.childSessionKey,
276280
lastEventAt: Date.now(),
277281
eventSummary: `No output for ${Math.round(noOutputNoticeMs / 1000)}s. It may be waiting for input.`,
278282
});
@@ -319,6 +323,8 @@ export function startAcpSpawnParentStreamRelay(params: {
319323
stallNotified = false;
320324
recordTaskRunProgressByRunId({
321325
runId,
326+
runtime: "acp",
327+
sessionKey: params.childSessionKey,
322328
lastEventAt: Date.now(),
323329
eventSummary: "Resumed output.",
324330
});

src/agents/subagent-registry-lifecycle.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ export function createSubagentRegistryLifecycleController(params: {
160160
}) => {
161161
setDetachedTaskDeliveryStatusByRunId({
162162
runId: giveUpParams.runId,
163+
runtime: "subagent",
164+
sessionKey: giveUpParams.entry.childSessionKey,
163165
deliveryStatus: "failed",
164166
});
165167
giveUpParams.entry.wakeOnDescendantSettle = undefined;
@@ -276,6 +278,8 @@ export function createSubagentRegistryLifecycleController(params: {
276278
if (didAnnounce) {
277279
setDetachedTaskDeliveryStatusByRunId({
278280
runId,
281+
runtime: "subagent",
282+
sessionKey: entry.childSessionKey,
279283
deliveryStatus: "delivered",
280284
});
281285
entry.wakeOnDescendantSettle = undefined;
@@ -332,6 +336,8 @@ export function createSubagentRegistryLifecycleController(params: {
332336
if (deferredDecision.kind === "give-up") {
333337
setDetachedTaskDeliveryStatusByRunId({
334338
runId,
339+
runtime: "subagent",
340+
sessionKey: entry.childSessionKey,
335341
deliveryStatus: "failed",
336342
});
337343
entry.wakeOnDescendantSettle = undefined;
@@ -466,6 +472,8 @@ export function createSubagentRegistryLifecycleController(params: {
466472
if (completeParams.outcome.status === "ok") {
467473
completeTaskRunByRunId({
468474
runId: entry.runId,
475+
runtime: "subagent",
476+
sessionKey: entry.childSessionKey,
469477
endedAt: entry.endedAt,
470478
lastEventAt: entry.endedAt ?? Date.now(),
471479
progressSummary: entry.frozenResultText ?? undefined,
@@ -474,6 +482,8 @@ export function createSubagentRegistryLifecycleController(params: {
474482
} else {
475483
failTaskRunByRunId({
476484
runId: entry.runId,
485+
runtime: "subagent",
486+
sessionKey: entry.childSessionKey,
477487
status: completeParams.outcome.status === "timeout" ? "timed_out" : "failed",
478488
endedAt: entry.endedAt,
479489
lastEventAt: entry.endedAt ?? Date.now(),

src/cron/service/ops.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,8 +400,7 @@ function tryCreateManualTaskRun(params: {
400400
createRunningTaskRun({
401401
runtime: "cron",
402402
sourceId: params.job.id,
403-
ownerKey: `system:cron:${params.job.id}`,
404-
scopeKind: "system",
403+
requesterSessionKey: "",
405404
childSessionKey: params.job.sessionKey,
406405
agentId: params.job.agentId,
407406
runId,
@@ -437,6 +436,7 @@ function tryFinishManualTaskRun(
437436
if (params.coreResult.status === "ok" || params.coreResult.status === "skipped") {
438437
completeTaskRunByRunId({
439438
runId: params.taskRunId,
439+
runtime: "cron",
440440
endedAt: params.endedAt,
441441
lastEventAt: params.endedAt,
442442
terminalSummary: params.coreResult.summary ?? undefined,
@@ -445,6 +445,7 @@ function tryFinishManualTaskRun(
445445
}
446446
failTaskRunByRunId({
447447
runId: params.taskRunId,
448+
runtime: "cron",
448449
status:
449450
normalizeCronRunErrorText(params.coreResult.error) === "cron: job execution timed out"
450451
? "timed_out"

src/cron/service/timer.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,7 @@ function tryCreateCronTaskRun(params: {
138138
createRunningTaskRun({
139139
runtime: "cron",
140140
sourceId: params.job.id,
141-
ownerKey: `system:cron:${params.job.id}`,
142-
scopeKind: "system",
141+
requesterSessionKey: "",
143142
childSessionKey: params.job.sessionKey,
144143
agentId: params.job.agentId,
145144
runId,
@@ -171,6 +170,7 @@ function tryFinishCronTaskRun(
171170
if (result.status === "ok" || result.status === "skipped") {
172171
completeTaskRunByRunId({
173172
runId: result.taskRunId,
173+
runtime: "cron",
174174
endedAt: result.endedAt,
175175
lastEventAt: result.endedAt,
176176
terminalSummary: result.summary ?? undefined,
@@ -179,6 +179,7 @@ function tryFinishCronTaskRun(
179179
}
180180
failTaskRunByRunId({
181181
runId: result.taskRunId,
182+
runtime: "cron",
182183
status:
183184
normalizeCronRunErrorText(result.error) === timeoutErrorMessage() ? "timed_out" : "failed",
184185
endedAt: result.endedAt,

src/tasks/task-executor.test.ts

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
setDetachedTaskDeliveryStatusByRunId,
1111
startTaskRunByRunId,
1212
} from "./task-executor.js";
13-
import { findTaskByRunId, resetTaskRegistryForTests } from "./task-registry.js";
13+
import { getTaskById, resetTaskRegistryForTests } from "./task-registry.js";
1414

1515
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;
1616
const hoisted = vi.hoisted(() => {
@@ -67,8 +67,7 @@ describe("task-executor", () => {
6767
await withTaskExecutorStateDir(async () => {
6868
const created = createQueuedTaskRun({
6969
runtime: "acp",
70-
ownerKey: "agent:main:main",
71-
scopeKind: "session",
70+
requesterSessionKey: "agent:main:main",
7271
childSessionKey: "agent:codex:acp:child",
7372
runId: "run-executor-queued",
7473
task: "Investigate issue",
@@ -90,7 +89,7 @@ describe("task-executor", () => {
9089
terminalSummary: "Done.",
9190
});
9291

93-
expect(findTaskByRunId("run-executor-queued")).toMatchObject({
92+
expect(getTaskById(created.taskId)).toMatchObject({
9493
taskId: created.taskId,
9594
status: "succeeded",
9695
startedAt: 100,
@@ -104,8 +103,7 @@ describe("task-executor", () => {
104103
await withTaskExecutorStateDir(async () => {
105104
const created = createRunningTaskRun({
106105
runtime: "subagent",
107-
ownerKey: "agent:main:main",
108-
scopeKind: "session",
106+
requesterSessionKey: "agent:main:main",
109107
childSessionKey: "agent:codex:subagent:child",
110108
runId: "run-executor-fail",
111109
task: "Write summary",
@@ -131,7 +129,7 @@ describe("task-executor", () => {
131129
deliveryStatus: "failed",
132130
});
133131

134-
expect(findTaskByRunId("run-executor-fail")).toMatchObject({
132+
expect(getTaskById(created.taskId)).toMatchObject({
135133
taskId: created.taskId,
136134
status: "failed",
137135
progressSummary: "Collecting results",
@@ -145,8 +143,7 @@ describe("task-executor", () => {
145143
await withTaskExecutorStateDir(async () => {
146144
const created = createRunningTaskRun({
147145
runtime: "acp",
148-
ownerKey: "agent:main:main",
149-
scopeKind: "session",
146+
requesterSessionKey: "agent:main:main",
150147
requesterOrigin: {
151148
channel: "telegram",
152149
to: "telegram:123",
@@ -167,7 +164,7 @@ describe("task-executor", () => {
167164
terminalSummary: "Writable session required.",
168165
});
169166

170-
expect(findTaskByRunId("run-executor-blocked")).toMatchObject({
167+
expect(getTaskById(created.taskId)).toMatchObject({
171168
taskId: created.taskId,
172169
status: "succeeded",
173170
terminalOutcome: "blocked",
@@ -182,8 +179,7 @@ describe("task-executor", () => {
182179

183180
const child = createRunningTaskRun({
184181
runtime: "acp",
185-
ownerKey: "agent:main:main",
186-
scopeKind: "session",
182+
requesterSessionKey: "agent:main:main",
187183
childSessionKey: "agent:codex:acp:child",
188184
runId: "run-linear-cancel",
189185
task: "Inspect a PR",
@@ -200,7 +196,7 @@ describe("task-executor", () => {
200196
found: true,
201197
cancelled: true,
202198
});
203-
expect(findTaskByRunId("run-linear-cancel")).toMatchObject({
199+
expect(getTaskById(child.taskId)).toMatchObject({
204200
taskId: child.taskId,
205201
status: "cancelled",
206202
});
@@ -221,8 +217,7 @@ describe("task-executor", () => {
221217

222218
const child = createRunningTaskRun({
223219
runtime: "subagent",
224-
ownerKey: "agent:main:main",
225-
scopeKind: "session",
220+
requesterSessionKey: "agent:main:main",
226221
childSessionKey: "agent:codex:subagent:child",
227222
runId: "run-subagent-cancel",
228223
task: "Inspect a PR",
@@ -239,7 +234,7 @@ describe("task-executor", () => {
239234
found: true,
240235
cancelled: true,
241236
});
242-
expect(findTaskByRunId("run-subagent-cancel")).toMatchObject({
237+
expect(getTaskById(child.taskId)).toMatchObject({
243238
taskId: child.taskId,
244239
status: "cancelled",
245240
});
@@ -249,4 +244,42 @@ describe("task-executor", () => {
249244
});
250245
});
251246
});
247+
248+
it("scopes run-id updates to the matching runtime and session", async () => {
249+
await withTaskExecutorStateDir(async () => {
250+
const victim = createRunningTaskRun({
251+
runtime: "acp",
252+
requesterSessionKey: "agent:victim:main",
253+
childSessionKey: "agent:victim:acp:child",
254+
runId: "run-shared-executor-scope",
255+
task: "Victim ACP task",
256+
deliveryStatus: "pending",
257+
});
258+
const attacker = createRunningTaskRun({
259+
runtime: "cli",
260+
requesterSessionKey: "agent:attacker:main",
261+
childSessionKey: "agent:attacker:main",
262+
runId: "run-shared-executor-scope",
263+
task: "Attacker CLI task",
264+
deliveryStatus: "not_applicable",
265+
});
266+
267+
failTaskRunByRunId({
268+
runId: "run-shared-executor-scope",
269+
runtime: "cli",
270+
sessionKey: "agent:attacker:main",
271+
endedAt: 40,
272+
lastEventAt: 40,
273+
error: "attacker controlled error",
274+
});
275+
276+
expect(getTaskById(attacker.taskId)).toMatchObject({
277+
status: "failed",
278+
error: "attacker controlled error",
279+
});
280+
expect(getTaskById(victim.taskId)).toMatchObject({
281+
status: "running",
282+
});
283+
});
284+
});
252285
});

0 commit comments

Comments
 (0)