Skip to content

Commit e339586

Browse files
fix(plugin-state): evict current namespace on plugin row cap
Make plugin-state enforce the plugin-wide live-row fuse by evicting only from the namespace currently being written, preserving sibling namespace rows and still failing atomically when the current namespace cannot free enough rows. Raise the plugin-wide cap to 6,000 rows, keep Telegram's persistent message-cache namespace at 3,000 entries, and document the updated SDK runtime contract. Harden legacy plugin-state import so capacity pressure cannot archive a source after losing imported keys, with focused regression coverage for Telegram-shaped namespaces and migration rollback. Also restore the Docker runtime-assets preflight step in full release validation so release workflow contract tests stay aligned. Verification: focused plugin-state, migration, Telegram, workflow-contract, lint, deprecated-API, diff-check, Blacksmith Testbox, CI, CodeQL, Workflow Sanity, OpenGrep, and autoreview all passed on PR head fee021c. Co-authored-by: Keshav's Bot <keshavbotagent@gmail.com>
1 parent 90f3007 commit e339586

10 files changed

Lines changed: 291 additions & 40 deletions

File tree

.github/workflows/full-release-validation.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,16 @@ jobs:
240240
fetch-depth: 1
241241
persist-credentials: true
242242

243+
- name: Verify Docker runtime-assets prune path
244+
env:
245+
DOCKER_BUILDKIT: "1"
246+
run: |
247+
set -euo pipefail
248+
timeout --kill-after=30s 35m docker build \
249+
--target runtime-assets \
250+
--build-arg OPENCLAW_EXTENSIONS="diagnostics-otel,codex" \
251+
.
252+
243253
- name: Build and smoke test final Docker runtime image
244254
env:
245255
DOCKER_BUILDKIT: "1"

docs/plugins/sdk-runtime.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ two-party event loops that do not go through the shared inbound reply runner.
524524
await store.clear();
525525
```
526526

527-
Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Use `registerIfAbsent(...)` for atomic dedupe claims: it returns `true` when the key was missing or expired and registered, or `false` when a live value already exists without overwriting its value, creation time, or TTL. Limits: `maxEntries` per namespace, 1,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry.
527+
Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Use `registerIfAbsent(...)` for atomic dedupe claims: it returns `true` when the key was missing or expired and registered, or `false` when a live value already exists without overwriting its value, creation time, or TTL. Limits: `maxEntries` per namespace, 6,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry. When a write would exceed the plugin row cap, the runtime may evict the oldest live rows from the namespace being written; sibling namespaces are not evicted for that write, and the write still fails if the namespace cannot free enough rows.
528528

529529
<Warning>
530530
Bundled plugins only in this release.

extensions/telegram/src/message-cache.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
createTelegramMessageCache,
88
resetTelegramMessageCacheBucketsForTest,
99
resolveTelegramMessageCachePath,
10+
TELEGRAM_MESSAGE_CACHE_PERSISTENT_MAX_MESSAGES,
1011
type TelegramMessageCachePersistentStore,
1112
} from "./message-cache.js";
1213

@@ -24,7 +25,7 @@ type PersistedCacheValue = {
2425

2526
let persistentStoreId = 0;
2627

27-
function createMemoryPersistentStore(maxEntries = 1000): {
28+
function createMemoryPersistentStore(maxEntries = TELEGRAM_MESSAGE_CACHE_PERSISTENT_MAX_MESSAGES): {
2829
bucketKey: string;
2930
entries: Map<string, PersistedCacheValue>;
3031
store: TelegramMessageCachePersistentStore;

extensions/telegram/src/message-cache.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type TelegramCachedMessageObservation = {
8282
type TelegramEmbeddedReplyMessage = NonNullable<Message["reply_to_message"]>;
8383

8484
const DEFAULT_MAX_MESSAGES = 5000;
85-
export const TELEGRAM_MESSAGE_CACHE_PERSISTENT_MAX_MESSAGES = 1000;
85+
export const TELEGRAM_MESSAGE_CACHE_PERSISTENT_MAX_MESSAGES = 3000;
8686
export const TELEGRAM_MESSAGE_CACHE_PERSISTENT_NAMESPACE = "telegram.message-cache";
8787
const PERSISTENT_BUCKET_KEY = `plugin-state:${TELEGRAM_MESSAGE_CACHE_PERSISTENT_NAMESPACE}`;
8888
const COMPACT_THRESHOLD_RATIO = 2;

src/commands/doctor-state-migrations.test.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
createPluginStateKeyedStore,
88
resetPluginStateStoreForTests,
99
} from "../plugin-state/plugin-state-store.js";
10+
import { seedPluginStateEntriesForTests } from "../plugin-state/plugin-state-store.test-helpers.js";
1011
import {
1112
autoMigrateLegacyStateDir,
1213
autoMigrateLegacyState,
@@ -711,6 +712,69 @@ describe("doctor legacy state migrations", () => {
711712
});
712713
});
713714

715+
it("keeps plugin-state import source when plugin cap eviction drops an imported row", async () => {
716+
const root = await makeTempRoot();
717+
const sourcePath = path.join(root, "legacy-cache.json");
718+
fs.writeFileSync(sourcePath, "legacy", "utf-8");
719+
mockedChannelMigrationPlans.plans = [
720+
{
721+
kind: "plugin-state-import",
722+
label: "Test capped cache",
723+
sourcePath,
724+
targetPath: "plugin state:test.capped-cache",
725+
pluginId: "telegram",
726+
namespace: "test.capped-cache",
727+
maxEntries: 6_000,
728+
scopeKey: "scope",
729+
cleanupSource: "rename",
730+
readEntries: () => [
731+
{ key: "first", value: { body: "first" } },
732+
{ key: "second", value: { body: "second" } },
733+
],
734+
},
735+
];
736+
737+
await withStateDir(root, async () => {
738+
seedPluginStateEntriesForTests(
739+
Array.from({ length: 5_999 }, (_, index) => ({
740+
pluginId: "telegram",
741+
namespace: "test.sibling-cache",
742+
key: `sibling-${index}`,
743+
value: { body: "sibling" },
744+
})),
745+
);
746+
});
747+
resetPluginStateStoreForTests();
748+
749+
const detected = await detectLegacyStateMigrations({
750+
cfg: {},
751+
env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv,
752+
});
753+
const result = await runLegacyStateMigrations({ detected });
754+
755+
expect(result.warnings).toStrictEqual([
756+
"Skipped migrating Test capped cache because plugin state has room for 1 of 2 missing entries; left legacy source in place",
757+
]);
758+
expect(result.changes).not.toContain("Migrated 2 Test capped cache entries → plugin state");
759+
expect(result.changes).not.toContain(
760+
`Archived Test capped cache legacy source → ${sourcePath}.migrated`,
761+
);
762+
expect(fs.existsSync(sourcePath)).toBe(true);
763+
expect(fs.existsSync(`${sourcePath}.migrated`)).toBe(false);
764+
765+
await withStateDir(root, async () => {
766+
const store = createPluginStateKeyedStore<{ body: string }>("telegram", {
767+
namespace: "test.capped-cache",
768+
maxEntries: 6_000,
769+
});
770+
const valuesByKey = new Map(
771+
(await store.entries()).map(({ key, value }) => [key, value.body]),
772+
);
773+
expect(valuesByKey.has("scope:first")).toBe(false);
774+
expect(valuesByKey.has("scope:second")).toBe(false);
775+
});
776+
});
777+
714778
it("routes legacy state to the default agent entry", async () => {
715779
const root = await makeTempRoot();
716780
const cfg: OpenClawConfig = {

src/infra/state-migrations.ts

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ import { canonicalizeMainSessionAlias } from "../config/sessions/main-session.js
1919
import type { SessionScope } from "../config/sessions/types.js";
2020
import type { OpenClawConfig } from "../config/types.openclaw.js";
2121
import { createSubsystemLogger } from "../logging/subsystem.js";
22-
import { createPluginStateKeyedStore } from "../plugin-state/plugin-state-store.js";
22+
import {
23+
countPluginStateLiveEntries,
24+
createPluginStateKeyedStore,
25+
MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN,
26+
} from "../plugin-state/plugin-state-store.js";
2327
import {
2428
buildAgentMainSessionKey,
2529
DEFAULT_AGENT_ID,
@@ -127,6 +131,15 @@ function resolvePluginStateImportTargetKey(scopeKey: string, key: string): strin
127131
return scopeKey ? `${scopeKey}:${key}` : key;
128132
}
129133

134+
function findMissingKey(expected: Set<string>, actual: Set<string>): string | undefined {
135+
for (const key of expected) {
136+
if (!actual.has(key)) {
137+
return key;
138+
}
139+
}
140+
return undefined;
141+
}
142+
130143
async function withPluginStateImportEnv<T>(
131144
plan: Extract<ChannelLegacyStateMigrationPlan, { kind: "plugin-state-import" }>,
132145
run: () => Promise<T>,
@@ -155,23 +168,41 @@ async function runLegacyMigrationPlans(
155168
for (const plan of plans) {
156169
if (plan.kind === "plugin-state-import") {
157170
await withPluginStateImportEnv(plan, async () => {
158-
let storeEntries: Array<{ key: string }> = [];
171+
let storeEntries: Array<{ key: string; value: unknown }> = [];
172+
let pluginEntryCount = 0;
159173
const store = createPluginStateKeyedStore<unknown>(plan.pluginId, {
160174
namespace: plan.namespace,
161175
maxEntries: plan.maxEntries,
162176
});
163177
try {
164178
storeEntries = await store.entries();
179+
pluginEntryCount = countPluginStateLiveEntries(plan.pluginId);
165180
} catch (err) {
166181
warnings.push(
167182
`Failed reading ${plan.label} plugin state before migration: ${String(err)}`,
168183
);
169184
return;
170185
}
171186
const existingKeys = new Set(storeEntries.map(({ key }) => key));
187+
const existingValuesByKey = new Map(storeEntries.map(({ key, value }) => [key, value]));
188+
const expectedKeys = new Set(existingKeys);
172189
let remainingCapacity = Math.max(0, plan.maxEntries - storeEntries.length);
173190
const entries = await plan.readEntries();
191+
const missingEntries = entries.filter(
192+
({ key }) => !existingKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)),
193+
);
194+
const pluginRemainingCapacity = Math.max(
195+
0,
196+
MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN - pluginEntryCount,
197+
);
198+
if (missingEntries.length > pluginRemainingCapacity) {
199+
warnings.push(
200+
`Skipped migrating ${plan.label} because plugin state has room for ${pluginRemainingCapacity} of ${missingEntries.length} missing entries; left legacy source in place`,
201+
);
202+
return;
203+
}
174204
let imported = 0;
205+
const importedKeys: string[] = [];
175206
for (const entry of entries) {
176207
const targetKey = resolvePluginStateImportTargetKey(plan.scopeKey, entry.key);
177208
if (existingKeys.has(targetKey)) {
@@ -182,7 +213,26 @@ async function runLegacyMigrationPlans(
182213
}
183214
try {
184215
await store.register(targetKey, entry.value);
216+
const nextExpectedKeys = new Set(expectedKeys);
217+
nextExpectedKeys.add(targetKey);
218+
const liveKeys = new Set((await store.entries()).map(({ key }) => key));
219+
const missingKey = findMissingKey(nextExpectedKeys, liveKeys);
220+
if (missingKey) {
221+
for (const importedKey of importedKeys.toReversed()) {
222+
await store.delete(importedKey);
223+
}
224+
await store.delete(targetKey);
225+
if (existingValuesByKey.has(missingKey)) {
226+
await store.register(missingKey, existingValuesByKey.get(missingKey));
227+
}
228+
warnings.push(
229+
`Stopped migrating ${plan.label} because plugin state cap evicted ${missingKey}; left legacy source in place`,
230+
);
231+
return;
232+
}
233+
expectedKeys.add(targetKey);
185234
existingKeys.add(targetKey);
235+
importedKeys.push(targetKey);
186236
remainingCapacity--;
187237
imported++;
188238
} catch (err) {
@@ -194,10 +244,14 @@ async function runLegacyMigrationPlans(
194244
`Migrated ${imported} ${plan.label} ${imported === 1 ? "entry" : "entries"} → plugin state`,
195245
);
196246
}
247+
let cleanupKeys = existingKeys;
248+
if (plan.cleanupSource === "rename") {
249+
cleanupKeys = expectedKeys;
250+
}
197251
const allEntriesCovered =
198252
entries.length > 0 &&
199253
entries.every(({ key }) =>
200-
existingKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)),
254+
cleanupKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)),
201255
);
202256
if (allEntriesCovered && plan.cleanupSource === "rename" && fileExists(plan.sourcePath)) {
203257
const archivedPath = `${plan.sourcePath}.migrated`;

src/plugin-state/plugin-state-store.e2e.test.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,10 +196,9 @@ describe("limits", () => {
196196

197197
it("enforces the per-plugin live-row cap", async () => {
198198
await withOpenClawTestState({ label: "e2e-limit-plugin" }, async () => {
199-
// Spread MAX_ENTRIES_PER_PLUGIN rows across several namespaces so
200-
// namespace eviction never fires (each namespace has generous room).
199+
// Fill the plugin budget outside the namespace that attempts the write.
201200
const nsCount = 10;
202-
const perNs = MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN / nsCount; // 100
201+
const perNs = MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN / nsCount;
203202
seedPluginStateEntriesForTests(
204203
Array.from({ length: MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN }, (_, index) => {
205204
const ns = Math.floor(index / perNs);
@@ -213,8 +212,8 @@ describe("limits", () => {
213212
}),
214213
);
215214
const store = createPluginStateKeyedStore("fixture-plugin", {
216-
namespace: "ns-0",
217-
maxEntries: perNs + 1,
215+
namespace: "overflow-ns",
216+
maxEntries: 10,
218217
});
219218

220219
// One more row tips over the plugin-wide limit.

src/plugin-state/plugin-state-store.sqlite.ts

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ const PLUGIN_STATE_SCHEMA_VERSION = 1;
1616
const PLUGIN_STATE_DIR_MODE = 0o700;
1717
const PLUGIN_STATE_FILE_MODE = 0o600;
1818
const PLUGIN_STATE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
19-
const MAX_ENTRIES_PER_PLUGIN = 1_000;
19+
// Plugin-wide fuse only; namespace maxEntries still owns normal cache eviction.
20+
const MAX_ENTRIES_PER_PLUGIN = 6_000;
2021

2122
export const MAX_PLUGIN_STATE_VALUE_BYTES = 65_536;
2223
export const MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN = MAX_ENTRIES_PER_PLUGIN;
@@ -421,7 +422,24 @@ function enforcePostRegisterLimits(params: {
421422
| CountRow
422423
| undefined,
423424
);
424-
if (pluginCount > MAX_ENTRIES_PER_PLUGIN) {
425+
if (pluginCount <= MAX_ENTRIES_PER_PLUGIN) {
426+
return;
427+
}
428+
429+
// Shed rows from the namespace that grew before failing the plugin write.
430+
params.store.statements.deleteOldestNamespace.run(
431+
params.pluginId,
432+
params.namespace,
433+
params.protectedKey,
434+
params.now,
435+
pluginCount - MAX_ENTRIES_PER_PLUGIN,
436+
);
437+
const remainingPluginCount = countRow(
438+
params.store.statements.countLivePlugin.get(params.pluginId, params.now) as
439+
| CountRow
440+
| undefined,
441+
);
442+
if (remainingPluginCount > MAX_ENTRIES_PER_PLUGIN) {
425443
throw createPluginStateError({
426444
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
427445
operation: "register",
@@ -609,6 +627,20 @@ export function pluginStateEntries(params: {
609627
}
610628
}
611629

630+
export function countPluginStateLiveEntries(pluginId: string): number {
631+
try {
632+
const { statements } = openPluginStateDatabase("entries");
633+
return countRow(statements.countLivePlugin.get(pluginId, Date.now()) as CountRow | undefined);
634+
} catch (error) {
635+
throw wrapPluginStateError(
636+
error,
637+
"entries",
638+
"PLUGIN_STATE_READ_FAILED",
639+
"Failed to count plugin state entries.",
640+
);
641+
}
642+
}
643+
612644
export function pluginStateClear(params: { pluginId: string; namespace: string }): void {
613645
try {
614646
const { statements } = openPluginStateDatabase("clear");

0 commit comments

Comments
 (0)