Skip to content

Commit 899dc5f

Browse files
steipeteMrGeDiao
andauthored
fix(memory): retry transient embedding failures
Retry live query embeddings on transient provider transport failures and split eligible batch embedding socket failures after bounded retries. Fixes #71784 Fixes #44166 Supersedes #44167 Co-authored-by: MrGeDiao <MrGeDiao@users.noreply.github.com>
1 parent 95b2f9c commit 899dc5f

5 files changed

Lines changed: 297 additions & 24 deletions

File tree

extensions/memory-core/src/memory/index.test.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,90 @@ describe("memory index", () => {
572572
);
573573
});
574574

575+
it("retries transient query embedding transport failures during search", async () => {
576+
const cfg = createCfg({
577+
storePath: path.join(workspaceDir, "index-search-query-retry.sqlite"),
578+
hybrid: { enabled: true, vectorWeight: 0.5, textWeight: 0.5 },
579+
});
580+
const manager = await getPersistentManager(cfg);
581+
await manager.sync({ reason: "test" });
582+
583+
let queryCalls = 0;
584+
(
585+
manager as unknown as {
586+
provider: {
587+
id: string;
588+
model: string;
589+
embedQuery: (text: string) => Promise<number[]>;
590+
embedBatch: (texts: string[]) => Promise<number[][]>;
591+
close: () => Promise<void>;
592+
};
593+
waitForEmbeddingRetry: (delayMs: number, action: string) => Promise<void>;
594+
}
595+
).provider = {
596+
id: "openai",
597+
model: "mock-embed",
598+
embedQuery: async () => {
599+
queryCalls += 1;
600+
if (queryCalls === 1) {
601+
throw new Error("TypeError: fetch failed | other side closed");
602+
}
603+
return [1, 0, 0, 0];
604+
},
605+
embedBatch: async (texts: string[]) => texts.map(() => [1, 0, 0, 0]),
606+
close: async () => {},
607+
};
608+
(
609+
manager as unknown as {
610+
waitForEmbeddingRetry: (delayMs: number, action: string) => Promise<void>;
611+
}
612+
).waitForEmbeddingRetry = async () => {};
613+
614+
const results = await manager.search("alpha");
615+
616+
expect(queryCalls).toBe(2);
617+
expect(results.some((result) => result.path.endsWith("memory/2026-01-12.md"))).toBe(true);
618+
});
619+
620+
it("fails search after bounded query embedding retries are exhausted", async () => {
621+
const cfg = createCfg({
622+
storePath: path.join(workspaceDir, "index-search-query-retry-exhausted.sqlite"),
623+
hybrid: { enabled: true, vectorWeight: 0.5, textWeight: 0.5 },
624+
});
625+
const manager = await getPersistentManager(cfg);
626+
await manager.sync({ reason: "test" });
627+
628+
let queryCalls = 0;
629+
(
630+
manager as unknown as {
631+
provider: {
632+
id: string;
633+
model: string;
634+
embedQuery: (text: string) => Promise<number[]>;
635+
embedBatch: (texts: string[]) => Promise<number[][]>;
636+
close: () => Promise<void>;
637+
};
638+
}
639+
).provider = {
640+
id: "openai",
641+
model: "mock-embed",
642+
embedQuery: async () => {
643+
queryCalls += 1;
644+
throw new Error("TypeError: fetch failed | other side closed");
645+
},
646+
embedBatch: async (texts: string[]) => texts.map(() => [1, 0, 0, 0]),
647+
close: async () => {},
648+
};
649+
(
650+
manager as unknown as {
651+
waitForEmbeddingRetry: (delayMs: number, action: string) => Promise<void>;
652+
}
653+
).waitForEmbeddingRetry = async () => {};
654+
655+
await expect(manager.search("alpha")).rejects.toThrow("fetch failed");
656+
expect(queryCalls).toBe(3);
657+
});
658+
575659
it("preserves keyword-only hybrid hits when minScore exceeds text weight", async () => {
576660
await expectHybridKeywordSearchFindsMemory(
577661
createCfg({

extensions/memory-core/src/memory/manager-embedding-ops.ts

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ import {
3232
buildTextEmbeddingInputs,
3333
filterNonEmptyMemoryChunks,
3434
isRetryableMemoryEmbeddingError,
35+
isSplittableMemoryEmbeddingTransportError,
3536
resolveMemoryEmbeddingRetryDelay,
37+
runMemoryEmbeddingBatchRetryWithSplit,
3638
runMemoryEmbeddingRetryLoop,
3739
} from "./manager-embedding-policy.js";
3840
import { deleteMemoryFtsRows } from "./manager-fts-state.js";
@@ -344,26 +346,33 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
344346
throw new Error("Cannot embed batch in FTS-only mode (no embedding provider)");
345347
}
346348
try {
347-
return await runMemoryEmbeddingRetryLoop({
348-
run: async () => {
349+
return await runMemoryEmbeddingBatchRetryWithSplit({
350+
items: texts,
351+
run: async (batchTexts) => {
349352
const timeoutMs = this.resolveEmbeddingTimeout("batch");
350353
log.debug("memory embeddings: batch start", {
351354
provider: provider.id,
352-
items: texts.length,
355+
items: batchTexts.length,
353356
timeoutMs,
354357
});
355358
return await runEmbeddingOperationWithTimeout({
356359
timeoutMs,
357360
message: `memory embeddings batch timed out after ${Math.round(timeoutMs / 1000)}s`,
358-
run: async (signal) => await provider.embedBatch(texts, { signal }),
361+
run: async (signal) => await provider.embedBatch(batchTexts, { signal }),
359362
});
360363
},
361364
isRetryable: isRetryableMemoryEmbeddingError,
365+
isSplittable: isSplittableMemoryEmbeddingTransportError,
362366
waitForRetry: async (delayMs) => {
363367
await this.waitForEmbeddingRetry(delayMs, "retrying");
364368
},
365369
maxAttempts: EMBEDDING_RETRY_MAX_ATTEMPTS,
366370
baseDelayMs: EMBEDDING_RETRY_BASE_DELAY_MS,
371+
onSplit: ({ itemCount, splitAt }) => {
372+
log.warn(
373+
`memory embeddings transport failed after retries; splitting batch of ${itemCount} into ${splitAt} + ${itemCount - splitAt}`,
374+
);
375+
},
367376
});
368377
} catch (err) {
369378
this.markLocalEmbeddingProviderDegraded(err);
@@ -385,26 +394,33 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
385394
return await this.embedBatchWithRetry(inputs.map((input) => input.text));
386395
}
387396
try {
388-
return await runMemoryEmbeddingRetryLoop({
389-
run: async () => {
397+
return await runMemoryEmbeddingBatchRetryWithSplit({
398+
items: inputs,
399+
run: async (batchInputs) => {
390400
const timeoutMs = this.resolveEmbeddingTimeout("batch");
391401
log.debug("memory embeddings: structured batch start", {
392402
provider: provider.id,
393-
items: inputs.length,
403+
items: batchInputs.length,
394404
timeoutMs,
395405
});
396406
return await runEmbeddingOperationWithTimeout({
397407
timeoutMs,
398408
message: `memory embeddings batch timed out after ${Math.round(timeoutMs / 1000)}s`,
399-
run: async (signal) => await embedBatchInputs(inputs, { signal }),
409+
run: async (signal) => await embedBatchInputs(batchInputs, { signal }),
400410
});
401411
},
402412
isRetryable: isRetryableMemoryEmbeddingError,
413+
isSplittable: isSplittableMemoryEmbeddingTransportError,
403414
waitForRetry: async (delayMs) => {
404415
await this.waitForEmbeddingRetry(delayMs, "retrying structured batch");
405416
},
406417
maxAttempts: EMBEDDING_RETRY_MAX_ATTEMPTS,
407418
baseDelayMs: EMBEDDING_RETRY_BASE_DELAY_MS,
419+
onSplit: ({ itemCount, splitAt }) => {
420+
log.warn(
421+
`memory embeddings transport failed after retries; splitting structured batch of ${itemCount} into ${splitAt} + ${itemCount - splitAt}`,
422+
);
423+
},
408424
});
409425
} catch (err) {
410426
this.markLocalEmbeddingProviderDegraded(err);
@@ -422,7 +438,7 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
422438
Math.random(),
423439
EMBEDDING_RETRY_MAX_DELAY_MS,
424440
);
425-
log.warn(`memory embeddings rate limited; ${action} in ${waitMs}ms`);
441+
log.warn(`memory embeddings retryable error; ${action} in ${waitMs}ms`);
426442
await new Promise((resolve) => setTimeout(resolve, waitMs));
427443
}
428444

@@ -435,18 +451,28 @@ export abstract class MemoryManagerEmbeddingOps extends MemoryManagerSyncOps {
435451
});
436452
}
437453

438-
protected async embedQueryWithTimeout(text: string): Promise<number[]> {
454+
protected async embedQueryWithRetry(text: string): Promise<number[]> {
439455
const provider = this.provider;
440456
if (!provider) {
441457
throw new Error("Cannot embed query in FTS-only mode (no embedding provider)");
442458
}
443-
const timeoutMs = this.resolveEmbeddingTimeout("query");
444-
log.debug("memory embeddings: query start", { provider: provider.id, timeoutMs });
445459
try {
446-
return await runEmbeddingOperationWithTimeout({
447-
timeoutMs,
448-
message: `memory embeddings query timed out after ${Math.round(timeoutMs / 1000)}s`,
449-
run: async (signal) => await provider.embedQuery(text, { signal }),
460+
return await runMemoryEmbeddingRetryLoop({
461+
run: async () => {
462+
const timeoutMs = this.resolveEmbeddingTimeout("query");
463+
log.debug("memory embeddings: query start", { provider: provider.id, timeoutMs });
464+
return await runEmbeddingOperationWithTimeout({
465+
timeoutMs,
466+
message: `memory embeddings query timed out after ${Math.round(timeoutMs / 1000)}s`,
467+
run: async (signal) => await provider.embedQuery(text, { signal }),
468+
});
469+
},
470+
isRetryable: isRetryableMemoryEmbeddingError,
471+
waitForRetry: async (delayMs) => {
472+
await this.waitForEmbeddingRetry(delayMs, "retrying query");
473+
},
474+
maxAttempts: EMBEDDING_RETRY_MAX_ATTEMPTS,
475+
baseDelayMs: EMBEDDING_RETRY_BASE_DELAY_MS,
450476
});
451477
} catch (err) {
452478
this.markLocalEmbeddingProviderDegraded(err);

extensions/memory-core/src/memory/manager-embedding-policy.test.ts

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ import { describe, expect, it, vi } from "vitest";
22
import {
33
buildMemoryEmbeddingBatches,
44
filterNonEmptyMemoryChunks,
5+
isRetryableMemoryEmbeddingTransportError,
56
isRetryableMemoryEmbeddingError,
7+
isSplittableMemoryEmbeddingTransportError,
68
isStructuredInputTooLargeMemoryEmbeddingError,
79
resolveMemoryEmbeddingRetryDelay,
10+
runMemoryEmbeddingBatchRetryWithSplit,
811
runMemoryEmbeddingRetryLoop,
912
} from "./manager-embedding-policy.js";
1013

@@ -72,17 +75,30 @@ describe("memory embedding policy", () => {
7275
});
7376

7477
it("retries transient socket/network embedding errors", () => {
75-
const messages = [
78+
const splittableMessages = [
7679
"TypeError: fetch failed | other side closed",
7780
"undici error: UND_ERR_SOCKET",
7881
"read ECONNRESET",
7982
"socket hang up",
80-
"ETIMEDOUT",
8183
];
8284

83-
for (const message of messages) {
85+
for (const message of splittableMessages) {
8486
expect(isRetryableMemoryEmbeddingError(message)).toBe(true);
87+
expect(isRetryableMemoryEmbeddingTransportError(message)).toBe(true);
88+
expect(isSplittableMemoryEmbeddingTransportError(message)).toBe(true);
8589
}
90+
expect(isRetryableMemoryEmbeddingTransportError("ECONNREFUSED")).toBe(true);
91+
expect(isSplittableMemoryEmbeddingTransportError("ECONNREFUSED")).toBe(false);
92+
expect(isRetryableMemoryEmbeddingTransportError("EHOSTUNREACH")).toBe(true);
93+
expect(isSplittableMemoryEmbeddingTransportError("EHOSTUNREACH")).toBe(false);
94+
expect(isRetryableMemoryEmbeddingTransportError("memory embeddings batch timed out")).toBe(
95+
true,
96+
);
97+
expect(isSplittableMemoryEmbeddingTransportError("memory embeddings batch timed out")).toBe(
98+
false,
99+
);
100+
expect(isRetryableMemoryEmbeddingTransportError("worker terminated by user")).toBe(false);
101+
expect(isRetryableMemoryEmbeddingTransportError("embedding validation failed")).toBe(false);
86102
});
87103

88104
it("retries too-many-tokens-per-day errors", async () => {
@@ -110,6 +126,97 @@ describe("memory embedding policy", () => {
110126
expect(waits).toEqual([500]);
111127
});
112128

129+
it("stops after the configured maximum attempts", async () => {
130+
const run = vi.fn(async () => {
131+
throw new Error("TypeError: fetch failed | other side closed");
132+
});
133+
const waits: number[] = [];
134+
135+
await expect(
136+
runMemoryEmbeddingRetryLoop({
137+
run,
138+
isRetryable: isRetryableMemoryEmbeddingError,
139+
waitForRetry: async (delayMs) => {
140+
waits.push(delayMs);
141+
},
142+
maxAttempts: 3,
143+
baseDelayMs: 500,
144+
}),
145+
).rejects.toThrow("fetch failed");
146+
147+
expect(run).toHaveBeenCalledTimes(3);
148+
expect(waits).toEqual([500, 1000]);
149+
});
150+
151+
it("splits transport-failed batches after retries are exhausted", async () => {
152+
const waits: number[] = [];
153+
const splits: string[] = [];
154+
const run = vi.fn(async (items: string[]) => {
155+
if (items.length > 1) {
156+
throw new TypeError("fetch failed | other side closed");
157+
}
158+
return items.map((item) => [item.charCodeAt(0)]);
159+
});
160+
161+
const result = await runMemoryEmbeddingBatchRetryWithSplit({
162+
items: ["a", "b", "c", "d"],
163+
run,
164+
isRetryable: isRetryableMemoryEmbeddingError,
165+
isSplittable: isSplittableMemoryEmbeddingTransportError,
166+
waitForRetry: async (delayMs) => {
167+
waits.push(delayMs);
168+
},
169+
maxAttempts: 2,
170+
baseDelayMs: 500,
171+
onSplit: ({ itemCount, splitAt }) => {
172+
splits.push(`${itemCount}:${splitAt}`);
173+
},
174+
});
175+
176+
expect(result).toEqual([[97], [98], [99], [100]]);
177+
expect(run.mock.calls.map(([items]) => items.length)).toEqual([4, 4, 2, 2, 1, 1, 2, 2, 1, 1]);
178+
expect(waits).toEqual([500, 500, 500]);
179+
expect(splits).toEqual(["4:2", "2:1", "2:1"]);
180+
});
181+
182+
it("does not split exhausted service retry errors", async () => {
183+
const run = vi.fn(async () => {
184+
throw new Error("openai embeddings failed: 429 rate limit");
185+
});
186+
187+
await expect(
188+
runMemoryEmbeddingBatchRetryWithSplit({
189+
items: ["a", "b"],
190+
run,
191+
isRetryable: isRetryableMemoryEmbeddingError,
192+
isSplittable: isSplittableMemoryEmbeddingTransportError,
193+
waitForRetry: async () => {},
194+
maxAttempts: 1,
195+
baseDelayMs: 500,
196+
}),
197+
).rejects.toThrow("429 rate limit");
198+
expect(run).toHaveBeenCalledTimes(1);
199+
});
200+
201+
it("does not split whole-endpoint transport outages", async () => {
202+
const run = vi.fn(async () => {
203+
throw new Error("connect ECONNREFUSED 127.0.0.1:11434");
204+
});
205+
206+
await expect(
207+
runMemoryEmbeddingBatchRetryWithSplit({
208+
items: ["a", "b"],
209+
run,
210+
isRetryable: isRetryableMemoryEmbeddingError,
211+
isSplittable: isSplittableMemoryEmbeddingTransportError,
212+
waitForRetry: async () => {},
213+
maxAttempts: 2,
214+
baseDelayMs: 500,
215+
}),
216+
).rejects.toThrow("ECONNREFUSED");
217+
expect(run).toHaveBeenCalledTimes(2);
218+
});
219+
113220
it("classifies oversized structured-input errors", () => {
114221
expect(isStructuredInputTooLargeMemoryEmbeddingError("payload too large")).toBe(true);
115222
expect(

0 commit comments

Comments
 (0)