Skip to content

Commit 33c4462

Browse files
committed
fix(memory): bound embedding batch poll intervals
1 parent 68b5371 commit 33c4462

5 files changed

Lines changed: 101 additions & 12 deletions

File tree

extensions/google/embedding-batch.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ export async function runGeminiEmbeddingBatches(
284284
maxRequests: GEMINI_BATCH_MAX_REQUESTS,
285285
debugLabel: "memory embeddings: gemini batch submit",
286286
}),
287-
runGroup: async ({ group, groupIndex, groups, byCustomId }) => {
287+
runGroup: async ({ group, groupIndex, groups, byCustomId, pollIntervalMs, timeoutMs }) => {
288288
const batchInfo = await submitGeminiBatch({
289289
gemini: params.gemini,
290290
requests: group,
@@ -326,8 +326,8 @@ export async function runGeminiEmbeddingBatches(
326326
gemini: params.gemini,
327327
batchName,
328328
wait: params.wait,
329-
pollIntervalMs: params.pollIntervalMs,
330-
timeoutMs: params.timeoutMs,
329+
pollIntervalMs,
330+
timeoutMs,
331331
debug: params.debug,
332332
initial: batchInfo,
333333
});

extensions/openai/embedding-batch.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ export async function runOpenAiEmbeddingBatches(
211211
maxRequests: OPENAI_BATCH_MAX_REQUESTS,
212212
debugLabel: "memory embeddings: openai batch submit",
213213
}),
214-
runGroup: async ({ group, groupIndex, groups, byCustomId }) => {
214+
runGroup: async ({ group, groupIndex, groups, byCustomId, pollIntervalMs, timeoutMs }) => {
215215
const batchInfo = await submitOpenAiBatch({
216216
openAi: params.openAi,
217217
requests: group,
@@ -239,8 +239,8 @@ export async function runOpenAiEmbeddingBatches(
239239
openAi: params.openAi,
240240
batchId,
241241
wait: params.wait,
242-
pollIntervalMs: params.pollIntervalMs,
243-
timeoutMs: params.timeoutMs,
242+
pollIntervalMs,
243+
timeoutMs,
244244
debug: params.debug,
245245
initial: batchInfo,
246246
}),

extensions/voyage/embedding-batch.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ export async function runVoyageEmbeddingBatches(
228228
maxRequests: VOYAGE_BATCH_MAX_REQUESTS,
229229
debugLabel: "memory embeddings: voyage batch submit",
230230
}),
231-
runGroup: async ({ group, groupIndex, groups, byCustomId }) => {
231+
runGroup: async ({ group, groupIndex, groups, byCustomId, pollIntervalMs, timeoutMs }) => {
232232
const batchInfo = await submitVoyageBatch({
233233
client: params.client,
234234
requests: group,
@@ -257,8 +257,8 @@ export async function runVoyageEmbeddingBatches(
257257
client: params.client,
258258
batchId,
259259
wait: params.wait,
260-
pollIntervalMs: params.pollIntervalMs,
261-
timeoutMs: params.timeoutMs,
260+
pollIntervalMs,
261+
timeoutMs,
262262
debug: params.debug,
263263
initial: batchInfo,
264264
deps,
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
import { MAX_SAFE_TIMEOUT_DELAY_MS } from "../../../gateway-client/src/timeouts.js";
3+
import { buildEmbeddingBatchGroupOptions, runEmbeddingBatchGroups } from "./batch-runner.js";
4+
5+
describe("buildEmbeddingBatchGroupOptions", () => {
6+
it("clamps oversized embedding batch poll intervals to the timeout budget", () => {
7+
const options = buildEmbeddingBatchGroupOptions(
8+
{
9+
requests: ["request-1"],
10+
wait: true,
11+
pollIntervalMs: Number.MAX_SAFE_INTEGER,
12+
timeoutMs: 60_000,
13+
concurrency: 1,
14+
},
15+
{
16+
maxRequests: 100,
17+
debugLabel: "embedding batch submit",
18+
},
19+
);
20+
21+
expect(options.pollIntervalMs).toBe(60_000);
22+
});
23+
24+
it("passes clamped poll intervals into batch group runners", async () => {
25+
const runGroup = vi.fn(async () => {});
26+
27+
await runEmbeddingBatchGroups({
28+
requests: ["request-1"],
29+
maxRequests: 100,
30+
wait: true,
31+
pollIntervalMs: Number.MAX_SAFE_INTEGER,
32+
timeoutMs: 60_000,
33+
concurrency: 1,
34+
debugLabel: "embedding batch submit",
35+
runGroup,
36+
});
37+
38+
expect(runGroup).toHaveBeenCalledWith(
39+
expect.objectContaining({
40+
pollIntervalMs: 60_000,
41+
timeoutMs: 60_000,
42+
}),
43+
);
44+
});
45+
46+
it("keeps timeout-safe oversized embedding batch poll intervals bounded", () => {
47+
const options = buildEmbeddingBatchGroupOptions(
48+
{
49+
requests: ["request-1"],
50+
wait: true,
51+
pollIntervalMs: Number.MAX_SAFE_INTEGER,
52+
timeoutMs: Number.MAX_SAFE_INTEGER,
53+
concurrency: 1,
54+
},
55+
{
56+
maxRequests: 100,
57+
debugLabel: "embedding batch submit",
58+
},
59+
);
60+
61+
expect(options.pollIntervalMs).toBe(MAX_SAFE_TIMEOUT_DELAY_MS);
62+
});
63+
});

packages/memory-host-sdk/src/host/batch-runner.ts

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { resolveSafeTimeoutDelayMs } from "../../../gateway-client/src/timeouts.js";
12
import { splitBatchRequests } from "./batch-utils.js";
23
import { runWithConcurrency } from "./internal.js";
34

@@ -9,6 +10,20 @@ export type EmbeddingBatchExecutionParams = {
910
debug?: (message: string, data?: Record<string, unknown>) => void;
1011
};
1112

13+
function resolveEmbeddingBatchPollIntervalMs(params: {
14+
pollIntervalMs: number;
15+
timeoutMs: number;
16+
}): number {
17+
const safePollIntervalMs = resolveSafeTimeoutDelayMs(params.pollIntervalMs);
18+
const safeTimeoutMs =
19+
typeof params.timeoutMs === "number" &&
20+
Number.isFinite(params.timeoutMs) &&
21+
params.timeoutMs > 0
22+
? resolveSafeTimeoutDelayMs(params.timeoutMs)
23+
: safePollIntervalMs;
24+
return Math.min(safePollIntervalMs, safeTimeoutMs);
25+
}
26+
1227
export async function runEmbeddingBatchGroups<TRequest>(params: {
1328
requests: TRequest[];
1429
maxRequests: number;
@@ -23,23 +38,33 @@ export async function runEmbeddingBatchGroups<TRequest>(params: {
2338
groupIndex: number;
2439
groups: number;
2540
byCustomId: Map<string, number[]>;
41+
pollIntervalMs: number;
42+
timeoutMs: number;
2643
}) => Promise<void>;
2744
}): Promise<Map<string, number[]>> {
2845
if (params.requests.length === 0) {
2946
return new Map();
3047
}
3148
const groups = splitBatchRequests(params.requests, params.maxRequests);
3249
const byCustomId = new Map<string, number[]>();
50+
const pollIntervalMs = resolveEmbeddingBatchPollIntervalMs(params);
3351
const tasks = groups.map((group, groupIndex) => async () => {
34-
await params.runGroup({ group, groupIndex, groups: groups.length, byCustomId });
52+
await params.runGroup({
53+
group,
54+
groupIndex,
55+
groups: groups.length,
56+
byCustomId,
57+
pollIntervalMs,
58+
timeoutMs: params.timeoutMs,
59+
});
3560
});
3661

3762
params.debug?.(params.debugLabel, {
3863
requests: params.requests.length,
3964
groups: groups.length,
4065
wait: params.wait,
4166
concurrency: params.concurrency,
42-
pollIntervalMs: params.pollIntervalMs,
67+
pollIntervalMs,
4368
timeoutMs: params.timeoutMs,
4469
});
4570

@@ -51,11 +76,12 @@ export function buildEmbeddingBatchGroupOptions<TRequest>(
5176
params: { requests: TRequest[] } & EmbeddingBatchExecutionParams,
5277
options: { maxRequests: number; debugLabel: string },
5378
) {
79+
const pollIntervalMs = resolveEmbeddingBatchPollIntervalMs(params);
5480
return {
5581
requests: params.requests,
5682
maxRequests: options.maxRequests,
5783
wait: params.wait,
58-
pollIntervalMs: params.pollIntervalMs,
84+
pollIntervalMs,
5985
timeoutMs: params.timeoutMs,
6086
concurrency: params.concurrency,
6187
debug: params.debug,

0 commit comments

Comments
 (0)