Skip to content

Commit 0933726

Browse files
committed
fix(sdk): surface event pump failures
1 parent 9f48254 commit 0933726

3 files changed

Lines changed: 124 additions & 9 deletions

File tree

packages/sdk/src/client.ts

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,6 @@ export class OpenClaw {
445445
const matches = (event: OpenClawEvent) => event.runId === runId;
446446
const liveSource = this.normalizedEvents.stream(matches, { replay: true });
447447
const live = liveSource[Symbol.asyncIterator]();
448-
let nextLive = live.next();
449448
const seen = new Set<string>();
450449
try {
451450
for (const event of replayEvents) {
@@ -460,11 +459,10 @@ export class OpenClaw {
460459
yield runEvent;
461460
}
462461
while (true) {
463-
const next = await nextLive;
462+
const next = await live.next();
464463
if (next.done) {
465464
break;
466465
}
467-
nextLive = live.next();
468466
if (seen.has(next.value.id)) {
469467
continue;
470468
}
@@ -496,8 +494,11 @@ export class OpenClaw {
496494
};
497495
});
498496
this.eventPumpPromise = (async () => {
499-
const iterator = this.transport.events()[Symbol.asyncIterator]();
497+
let iterator: AsyncIterator<GatewayEvent> | undefined;
498+
let pumpError: unknown;
499+
let hasPumpError = false;
500500
try {
501+
iterator = this.transport.events()[Symbol.asyncIterator]();
501502
while (true) {
502503
const next = iterator.next();
503504
await Promise.resolve();
@@ -510,14 +511,28 @@ export class OpenClaw {
510511
this.recordReplayEvent(normalized);
511512
this.normalizedEvents.publish(normalized);
512513
}
514+
} catch (error) {
515+
pumpError = error;
516+
hasPumpError = true;
513517
} finally {
514518
markReady();
515-
await iterator.return?.();
516-
this.normalizedEvents.close();
519+
try {
520+
await iterator?.return?.();
521+
} catch (error) {
522+
if (!hasPumpError) {
523+
pumpError = error;
524+
hasPumpError = true;
525+
}
526+
}
527+
}
528+
if (hasPumpError) {
529+
this.normalizedEvents.close(pumpError);
530+
return;
517531
}
518-
})().catch(() => {
519-
markReady();
520532
this.normalizedEvents.close();
533+
})().catch((error: unknown) => {
534+
markReady();
535+
this.normalizedEvents.close(error);
521536
});
522537
return this.eventPumpReady;
523538
}

packages/sdk/src/event-hub.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ export class EventHub<T> {
1919
private readonly replayLimit: number;
2020
private readonly replayEvents: T[] = [];
2121
private closed = false;
22+
private closeError: unknown;
23+
private hasCloseError = false;
2224
private readonly listeners = new Set<Listener<T>>();
2325
private readonly waiters = new Set<() => void>();
2426

@@ -42,7 +44,12 @@ export class EventHub<T> {
4244
}
4345
}
4446

45-
close(): void {
47+
close(error?: unknown): void {
48+
const hasError = arguments.length > 0;
49+
if (hasError) {
50+
this.closeError = error;
51+
this.hasCloseError = true;
52+
}
4653
this.closed = true;
4754
this.replayEvents.length = 0;
4855
this.listeners.clear();
@@ -113,6 +120,9 @@ export class EventHub<T> {
113120
});
114121
}
115122
cleanup();
123+
if (this.hasCloseError) {
124+
throw this.closeError;
125+
}
116126
return { done: true, value: undefined as never };
117127
},
118128
return: async (): Promise<IteratorResult<T>> => {

packages/sdk/src/index.test.ts

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ class FakeTransport implements OpenClawTransport {
5454
}
5555
}
5656

57+
class EventsOnlyTransport implements OpenClawTransport {
58+
constructor(private readonly eventSource: AsyncIterable<GatewayEvent>) {}
59+
60+
async request<T = unknown>(): Promise<T> {
61+
return {} as T;
62+
}
63+
64+
events(): AsyncIterable<GatewayEvent> {
65+
return this.eventSource;
66+
}
67+
}
68+
5769
function requireTransportCall(calls: readonly RequestCall[], index: number): RequestCall {
5870
const call = calls[index];
5971
if (!call) {
@@ -683,6 +695,84 @@ describe("OpenClaw SDK", () => {
683695
expect(seen).toEqual(["run.started", "assistant.delta", "run.completed"]);
684696
});
685697

698+
it("rejects normalized event streams when the event pump fails before yielding", async () => {
699+
const failure = new Error("synthetic transport event failure");
700+
const transport = new EventsOnlyTransport({
701+
[Symbol.asyncIterator](): AsyncIterator<GatewayEvent> {
702+
return {
703+
next: async () => {
704+
throw failure;
705+
},
706+
};
707+
},
708+
});
709+
const oc = new OpenClaw({ transport });
710+
const iterator = oc.events()[Symbol.asyncIterator]();
711+
let futureIterator: AsyncIterator<OpenClawEvent> | undefined;
712+
713+
try {
714+
await expect(iterator.next()).rejects.toThrow("synthetic transport event failure");
715+
716+
futureIterator = oc.events()[Symbol.asyncIterator]();
717+
await expect(futureIterator.next()).rejects.toThrow("synthetic transport event failure");
718+
} finally {
719+
await futureIterator?.return?.();
720+
await iterator.return?.();
721+
await oc.close();
722+
}
723+
});
724+
725+
it("rejects run event streams after replaying events when the event pump fails", async () => {
726+
const failure = new Error("synthetic post-yield transport event failure");
727+
const rawEvent: GatewayEvent = {
728+
event: "agent",
729+
seq: 1,
730+
payload: {
731+
runId: "run_pump_failure",
732+
stream: "lifecycle",
733+
ts: 1_777_000_000_050,
734+
data: { phase: "start" },
735+
},
736+
};
737+
const transport = new EventsOnlyTransport({
738+
async *[Symbol.asyncIterator]() {
739+
yield rawEvent;
740+
throw failure;
741+
},
742+
});
743+
const oc = new OpenClaw({ transport });
744+
const run = await oc.runs.get("run_pump_failure");
745+
const iterator = run.events()[Symbol.asyncIterator]();
746+
let futureIterator: AsyncIterator<OpenClawEvent> | undefined;
747+
748+
try {
749+
const first = await iterator.next();
750+
expect(first.done).toBe(false);
751+
if (first.done !== false) {
752+
throw new Error("expected first run event");
753+
}
754+
expect(first.value.type).toBe("run.started");
755+
expect(first.value.runId).toBe("run_pump_failure");
756+
757+
await expect(iterator.next()).rejects.toThrow("synthetic post-yield transport event failure");
758+
759+
futureIterator = run.events()[Symbol.asyncIterator]();
760+
const replayed = await futureIterator.next();
761+
expect(replayed.done).toBe(false);
762+
if (replayed.done !== false) {
763+
throw new Error("expected replayed run event");
764+
}
765+
expect(replayed.value.type).toBe("run.started");
766+
await expect(futureIterator.next()).rejects.toThrow(
767+
"synthetic post-yield transport event failure",
768+
);
769+
} finally {
770+
await futureIterator?.return?.();
771+
await iterator.return?.();
772+
await oc.close();
773+
}
774+
});
775+
686776
it("does not surface raw chat projection events in per-run streams", async () => {
687777
const ts = 1_777_000_000_100;
688778
const transport = new FakeTransport({

0 commit comments

Comments
 (0)