Skip to content

Commit 983fd77

Browse files
authored
fix(memory-core): stream embedding cache seed during reindex
- stream safe-reindex embedding-cache seeding with SQLite iterate() - avoid no-op empty-cache transactions and keep regression coverage explicit - supersedes #73067 Thanks @parkertoddbrooks.
1 parent 2057713 commit 983fd77

3 files changed

Lines changed: 101 additions & 19 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai
3636
- Plugin SDK: fall back from partial bundled plugin directory overrides to package source public surfaces while preserving `OPENCLAW_DISABLE_BUNDLED_PLUGINS` as a hard disable. (#72817) Thanks @serkonyc.
3737
- Agents/ACPX: stop forwarding Codex ACP timeout config controls that Codex rejects while preserving OpenClaw's run-timeout watchdog for ACP subagents. Fixes #73052. Thanks @pfrederiksen and @richa65.
3838
- Memory Core: stream fallback vector search scoring with a bounded top-K result set so large indexes do not materialize every chunk embedding when sqlite-vec is unavailable. (#73069) Thanks @parkertoddbrooks.
39+
- Memory Core: stream embedding-cache seeding during safe reindex so large local caches do not materialize every row into the V8 heap before the atomic rebuild. (#73067) Thanks @parkertoddbrooks.
3940
- Memory/Ollama: add `memorySearch.remote.nonBatchConcurrency` for inline embedding indexing, default Ollama non-batch indexing to one request at a time, and keep batch concurrency separate from non-batch concurrency so local embedding backfills avoid timeout storms on smaller hosts. Carries forward #57733. Thanks @itilys.
4041
- macOS app: update Peekaboo, ElevenLabsKit, and MLX TTS helper dependencies, make canvas file watching and config/exec-approval state writes reliable under concurrent app/test activity, and keep the app plus helper builds warning-free. Thanks @Blaizzy.
4142
- iOS app: refresh SwiftPM/XcodeGen source hygiene, make app, extension, watch, and curated shared Swift files pass the prebuild SwiftFormat and SwiftLint checks, move relay registration off deprecated StoreKit receipt APIs, and keep simulator builds and logic tests warning-free. Thanks @ngutman.

extensions/memory-core/src/memory/index.test.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,72 @@ describe("memory index", () => {
445445
);
446446
});
447447

448+
it("streams embedding cache rows during safe reindex", async () => {
449+
vi.stubEnv("OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX", "0");
450+
type EmbeddingCacheRow = {
451+
provider: string;
452+
model: string;
453+
provider_key: string;
454+
hash: string;
455+
embedding: string;
456+
dims: number | null;
457+
updated_at: number;
458+
};
459+
type StatementWithAll = {
460+
all: () => EmbeddingCacheRow[];
461+
};
462+
463+
const cfg = createCfg({
464+
storePath: path.join(workspaceDir, "index-cache-seed-stream.sqlite"),
465+
cacheEnabled: true,
466+
});
467+
const manager = await getPersistentManager(cfg);
468+
await manager.sync({ reason: "test" });
469+
470+
// Safe reindex streams cache rows from the original database and writes
471+
// them into a temporary database, so the SELECT spy belongs on this handle.
472+
const sourceDb = (
473+
manager as unknown as {
474+
db: {
475+
prepare: (sql: string) => unknown;
476+
};
477+
}
478+
).db;
479+
const originalPrepare = sourceDb.prepare.bind(sourceDb);
480+
const cachedRows = (
481+
originalPrepare(
482+
"SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM embedding_cache",
483+
) as StatementWithAll
484+
).all();
485+
expect(cachedRows.length).toBeGreaterThan(0);
486+
487+
const beforeCalls = embedBatchCalls;
488+
const prepareSpy = vi.spyOn(sourceDb, "prepare").mockImplementation((sql: string) => {
489+
if (
490+
sql.includes(
491+
"SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM embedding_cache",
492+
)
493+
) {
494+
return {
495+
all: () => {
496+
throw new Error("embedding cache seed must stream rows via iterate()");
497+
},
498+
iterate: () => cachedRows[Symbol.iterator](),
499+
};
500+
}
501+
return originalPrepare(sql);
502+
});
503+
504+
try {
505+
(manager as unknown as { dirty: boolean }).dirty = true;
506+
await manager.sync({ reason: "test", force: true });
507+
} finally {
508+
prepareSpy.mockRestore();
509+
}
510+
511+
expect(embedBatchCalls).toBe(beforeCalls);
512+
});
513+
448514
it("builds FTS index and returns search results when no embedding provider is available", async () => {
449515
forceNoProvider = true;
450516

extensions/memory-core/src/memory/manager-sync-ops.ts

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -308,16 +308,17 @@ export abstract class MemoryManagerSyncOps {
308308
return openMemoryDatabaseAtPath(dbPath, this.settings.store.vector.enabled);
309309
}
310310

311-
private seedEmbeddingCache(sourceDb: DatabaseSync): void {
311+
private async seedEmbeddingCache(sourceDb: DatabaseSync): Promise<void> {
312312
if (!this.cache.enabled) {
313313
return;
314314
}
315+
let transactionStarted = false;
315316
try {
316317
const rows = sourceDb
317318
.prepare(
318319
`SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM ${EMBEDDING_CACHE_TABLE}`,
319320
)
320-
.all() as Array<{
321+
.iterate() as IterableIterator<{
321322
provider: string;
322323
model: string;
323324
provider_key: string;
@@ -326,19 +327,23 @@ export abstract class MemoryManagerSyncOps {
326327
dims: number | null;
327328
updated_at: number;
328329
}>;
329-
if (!rows.length) {
330-
return;
331-
}
332-
const insert = this.db.prepare(
333-
`INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)
334-
VALUES (?, ?, ?, ?, ?, ?, ?)
335-
ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET
336-
embedding=excluded.embedding,
337-
dims=excluded.dims,
338-
updated_at=excluded.updated_at`,
339-
);
340-
this.db.exec("BEGIN");
330+
// Keep gateway health probes responsive while rebuilding large caches.
331+
const SEED_EMBEDDING_YIELD_EVERY = 1000;
332+
let rowCount = 0;
333+
let insert: ReturnType<DatabaseSync["prepare"]> | null = null;
341334
for (const row of rows) {
335+
if (!insert) {
336+
insert = this.db.prepare(
337+
`INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)
338+
VALUES (?, ?, ?, ?, ?, ?, ?)
339+
ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET
340+
embedding=excluded.embedding,
341+
dims=excluded.dims,
342+
updated_at=excluded.updated_at`,
343+
);
344+
this.db.exec("BEGIN");
345+
transactionStarted = true;
346+
}
342347
insert.run(
343348
row.provider,
344349
row.model,
@@ -348,12 +353,22 @@ export abstract class MemoryManagerSyncOps {
348353
row.dims,
349354
row.updated_at,
350355
);
356+
rowCount += 1;
357+
if (rowCount % SEED_EMBEDDING_YIELD_EVERY === 0) {
358+
await new Promise<void>((resolve) => {
359+
setImmediate(resolve);
360+
});
361+
}
362+
}
363+
if (transactionStarted) {
364+
this.db.exec("COMMIT");
351365
}
352-
this.db.exec("COMMIT");
353366
} catch (err) {
354-
try {
355-
this.db.exec("ROLLBACK");
356-
} catch {}
367+
if (transactionStarted) {
368+
try {
369+
this.db.exec("ROLLBACK");
370+
} catch {}
371+
}
357372
throw err;
358373
}
359374
}
@@ -1167,7 +1182,7 @@ export abstract class MemoryManagerSyncOps {
11671182
targetPath: dbPath,
11681183
tempPath: tempDbPath,
11691184
build: async () => {
1170-
this.seedEmbeddingCache(originalDb);
1185+
await this.seedEmbeddingCache(originalDb);
11711186
const shouldSyncMemory = this.sources.has("memory");
11721187
const shouldSyncSessions = this.shouldSyncSessions(
11731188
{ reason: params.reason, force: params.force },

0 commit comments

Comments
 (0)