Skip to content

Commit 755ac6b

Browse files
committed
fix(memory): retry transient embedding transport failures
1 parent 631552c commit 755ac6b

3 files changed

Lines changed: 144 additions & 12 deletions

File tree

extensions/memory-core/src/memory/manager-embedding-ops.ts

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@ import {
3131
buildMemoryEmbeddingBatches,
3232
buildTextEmbeddingInputs,
3333
filterNonEmptyMemoryChunks,
34+
isRetryableMemoryEmbeddingTransportError,
3435
isRetryableMemoryEmbeddingError,
3536
resolveMemoryEmbeddingRetryDelay,
36-
runMemoryEmbeddingRetryLoop,
37+
runMemoryEmbeddingBatchRetryWithSplit,
3738
} from "./manager-embedding-policy.js";
3839
import { deleteMemoryFtsRows } from "./manager-fts-state.js";
3940
import { MemoryManagerSyncOps } from "./manager-sync-ops.js";
@@ -272,26 +273,33 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
272273
if (!provider) {
273274
throw new Error("Cannot embed batch in FTS-only mode (no embedding provider)");
274275
}
275-
return await runMemoryEmbeddingRetryLoop({
276-
run: async () => {
276+
return await runMemoryEmbeddingBatchRetryWithSplit({
277+
items: texts,
278+
run: async (batchTexts) => {
277279
const timeoutMs = this.resolveEmbeddingTimeout("batch");
278280
log.debug("memory embeddings: batch start", {
279281
provider: provider.id,
280-
items: texts.length,
282+
items: batchTexts.length,
281283
timeoutMs,
282284
});
283285
return await this.withTimeout(
284-
provider.embedBatch(texts),
286+
provider.embedBatch(batchTexts),
285287
timeoutMs,
286288
`memory embeddings batch timed out after ${Math.round(timeoutMs / 1000)}s`,
287289
);
288290
},
289291
isRetryable: isRetryableMemoryEmbeddingError,
292+
isSplittable: isRetryableMemoryEmbeddingTransportError,
290293
waitForRetry: async (delayMs) => {
291294
await this.waitForEmbeddingRetry(delayMs, "retrying");
292295
},
293296
maxAttempts: EMBEDDING_RETRY_MAX_ATTEMPTS,
294297
baseDelayMs: EMBEDDING_RETRY_BASE_DELAY_MS,
298+
onSplit: ({ itemCount, splitAt }) => {
299+
log.warn(
300+
`memory embeddings transport failed after retries; splitting batch of ${itemCount} into ${splitAt} + ${itemCount - splitAt}`,
301+
);
302+
},
295303
});
296304
}
297305

@@ -304,26 +312,33 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
304312
if (!embedBatchInputs) {
305313
return await this.embedBatchWithRetry(inputs.map((input) => input.text));
306314
}
307-
return await runMemoryEmbeddingRetryLoop({
308-
run: async () => {
315+
return await runMemoryEmbeddingBatchRetryWithSplit({
316+
items: inputs,
317+
run: async (batchInputs) => {
309318
const timeoutMs = this.resolveEmbeddingTimeout("batch");
310319
log.debug("memory embeddings: structured batch start", {
311320
provider: provider.id,
312-
items: inputs.length,
321+
items: batchInputs.length,
313322
timeoutMs,
314323
});
315324
return await this.withTimeout(
316-
embedBatchInputs(inputs),
325+
embedBatchInputs(batchInputs),
317326
timeoutMs,
318327
`memory embeddings batch timed out after ${Math.round(timeoutMs / 1000)}s`,
319328
);
320329
},
321330
isRetryable: isRetryableMemoryEmbeddingError,
331+
isSplittable: isRetryableMemoryEmbeddingTransportError,
322332
waitForRetry: async (delayMs) => {
323333
await this.waitForEmbeddingRetry(delayMs, "retrying structured batch");
324334
},
325335
maxAttempts: EMBEDDING_RETRY_MAX_ATTEMPTS,
326336
baseDelayMs: EMBEDDING_RETRY_BASE_DELAY_MS,
337+
onSplit: ({ itemCount, splitAt }) => {
338+
log.warn(
339+
`memory embeddings transport failed after retries; splitting structured batch of ${itemCount} into ${splitAt} + ${itemCount - splitAt}`,
340+
);
341+
},
327342
});
328343
}
329344

@@ -333,7 +348,7 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
333348
Math.random(),
334349
EMBEDDING_RETRY_MAX_DELAY_MS,
335350
);
336-
log.warn(`memory embeddings rate limited; ${action} in ${waitMs}ms`);
351+
log.warn(`memory embeddings retryable error; ${action} in ${waitMs}ms`);
337352
await new Promise((resolve) => setTimeout(resolve, waitMs));
338353
}
339354

extensions/memory-core/src/memory/manager-embedding-policy.test.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ import { describe, expect, it, vi } from "vitest";
22
import {
33
buildMemoryEmbeddingBatches,
44
filterNonEmptyMemoryChunks,
5+
isRetryableMemoryEmbeddingTransportError,
56
isRetryableMemoryEmbeddingError,
67
isStructuredInputTooLargeMemoryEmbeddingError,
78
resolveMemoryEmbeddingRetryDelay,
9+
runMemoryEmbeddingBatchRetryWithSplit,
810
runMemoryEmbeddingRetryLoop,
911
} from "./manager-embedding-policy.js";
1012

@@ -96,6 +98,73 @@ describe("memory embedding policy", () => {
9698
expect(waits).toEqual([500]);
9799
});
98100

101+
it("classifies transient transport embedding errors as retryable", () => {
102+
const retryableMessages = [
103+
"TypeError: fetch failed",
104+
"read ECONNRESET",
105+
"socket hang up",
106+
"UND_ERR_SOCKET: other side closed",
107+
"connection refused",
108+
];
109+
110+
for (const message of retryableMessages) {
111+
expect(isRetryableMemoryEmbeddingTransportError(message)).toBe(true);
112+
expect(isRetryableMemoryEmbeddingError(message)).toBe(true);
113+
}
114+
expect(isRetryableMemoryEmbeddingTransportError("worker terminated by user")).toBe(false);
115+
expect(isRetryableMemoryEmbeddingTransportError("embedding validation failed")).toBe(false);
116+
});
117+
118+
it("splits transport-failed batches after retries are exhausted", async () => {
119+
const waits: number[] = [];
120+
const splits: string[] = [];
121+
const run = vi.fn(async (items: string[]) => {
122+
if (items.length > 1) {
123+
throw new TypeError("fetch failed");
124+
}
125+
return items.map((item) => [item.charCodeAt(0)]);
126+
});
127+
128+
const result = await runMemoryEmbeddingBatchRetryWithSplit({
129+
items: ["a", "b", "c", "d"],
130+
run,
131+
isRetryable: isRetryableMemoryEmbeddingError,
132+
isSplittable: isRetryableMemoryEmbeddingTransportError,
133+
waitForRetry: async (delayMs) => {
134+
waits.push(delayMs);
135+
},
136+
maxAttempts: 1,
137+
baseDelayMs: 500,
138+
onSplit: ({ itemCount, splitAt }) => {
139+
splits.push(`${itemCount}:${splitAt}`);
140+
},
141+
});
142+
143+
expect(result).toEqual([[97], [98], [99], [100]]);
144+
expect(run.mock.calls.map(([items]) => items.length)).toEqual([4, 4, 2, 2, 1, 1, 2, 2, 1, 1]);
145+
expect(waits).toEqual([500, 500, 500]);
146+
expect(splits).toEqual(["4:2", "2:1", "2:1"]);
147+
});
148+
149+
it("does not split exhausted service retry errors", async () => {
150+
const run = vi.fn(async () => {
151+
throw new Error("openai embeddings failed: 429 rate limit");
152+
});
153+
154+
await expect(
155+
runMemoryEmbeddingBatchRetryWithSplit({
156+
items: ["a", "b"],
157+
run,
158+
isRetryable: isRetryableMemoryEmbeddingError,
159+
isSplittable: isRetryableMemoryEmbeddingTransportError,
160+
waitForRetry: async () => {},
161+
maxAttempts: 0,
162+
baseDelayMs: 500,
163+
}),
164+
).rejects.toThrow("429 rate limit");
165+
expect(run).toHaveBeenCalledTimes(1);
166+
});
167+
99168
it("classifies oversized structured-input errors", () => {
100169
expect(isStructuredInputTooLargeMemoryEmbeddingError("payload too large")).toBe(true);
101170
expect(

extensions/memory-core/src/memory/manager-embedding-policy.ts

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,20 @@ export function buildMemoryEmbeddingBatches<T extends MemoryEmbeddingChunk>(
8080
return batches;
8181
}
8282

83+
const RETRYABLE_MEMORY_EMBEDDING_SERVICE_ERROR_RE =
84+
/(rate[_ ]limit|too many requests|429|resource has been exhausted|5\d\d|cloudflare|tokens per day)/i;
85+
86+
const RETRYABLE_MEMORY_EMBEDDING_TRANSPORT_ERROR_RE =
87+
/(fetch failed|network error|socket hang up|socket terminated|other side closed|connection (?:reset|refused|aborted|timed out)|read ECONNRESET|write EPIPE|ECONNRESET|ECONNREFUSED|ETIMEDOUT|EPIPE|EHOSTUNREACH|ENETUNREACH|ECONNABORTED|EAI_AGAIN|UND_ERR_(?:CONNECT_TIMEOUT|DNS_RESOLVE_FAILED|CONNECT|SOCKET|HEADERS_TIMEOUT|BODY_TIMEOUT))/i;
88+
89+
export function isRetryableMemoryEmbeddingTransportError(message: string): boolean {
90+
return RETRYABLE_MEMORY_EMBEDDING_TRANSPORT_ERROR_RE.test(message);
91+
}
92+
8393
export function isRetryableMemoryEmbeddingError(message: string): boolean {
84-
return /(rate[_ ]limit|too many requests|429|resource has been exhausted|5\d\d|cloudflare|tokens per day)/i.test(
85-
message,
94+
return (
95+
RETRYABLE_MEMORY_EMBEDDING_SERVICE_ERROR_RE.test(message) ||
96+
isRetryableMemoryEmbeddingTransportError(message)
8697
);
8798
}
8899

@@ -124,6 +135,43 @@ export async function runMemoryEmbeddingRetryLoop<T>(params: {
124135
}
125136
}
126137

138+
export async function runMemoryEmbeddingBatchRetryWithSplit<TInput, TOutput>(params: {
139+
items: TInput[];
140+
run: (items: TInput[]) => Promise<TOutput[]>;
141+
isRetryable: (message: string) => boolean;
142+
isSplittable: (message: string) => boolean;
143+
waitForRetry: (delayMs: number) => Promise<void>;
144+
maxAttempts: number;
145+
baseDelayMs: number;
146+
onSplit?: (info: { itemCount: number; splitAt: number; message: string }) => void;
147+
}): Promise<TOutput[]> {
148+
try {
149+
return await runMemoryEmbeddingRetryLoop({
150+
run: async () => await params.run(params.items),
151+
isRetryable: params.isRetryable,
152+
waitForRetry: params.waitForRetry,
153+
maxAttempts: params.maxAttempts,
154+
baseDelayMs: params.baseDelayMs,
155+
});
156+
} catch (err) {
157+
const message = formatErrorMessage(err);
158+
if (params.items.length <= 1 || !params.isSplittable(message)) {
159+
throw err;
160+
}
161+
const splitAt = Math.ceil(params.items.length / 2);
162+
params.onSplit?.({ itemCount: params.items.length, splitAt, message });
163+
const left = await runMemoryEmbeddingBatchRetryWithSplit({
164+
...params,
165+
items: params.items.slice(0, splitAt),
166+
});
167+
const right = await runMemoryEmbeddingBatchRetryWithSplit({
168+
...params,
169+
items: params.items.slice(splitAt),
170+
});
171+
return [...left, ...right];
172+
}
173+
}
174+
127175
export function buildTextEmbeddingInputs(chunks: MemoryEmbeddingChunk[]): MemoryEmbeddingInput[] {
128176
return chunks.map((chunk) => chunk.embeddingInput ?? { text: chunk.text });
129177
}

0 commit comments

Comments
 (0)