Skip to content

Commit b240bee

Browse files
authored
Merge cb1827c into ea3bb92
2 parents ea3bb92 + cb1827c commit b240bee

3 files changed

Lines changed: 290 additions & 76 deletions

File tree

src/commitments/store-writer.ts

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Per-store-path mutation gate for the commitments store. Mirrors the
2+
// in-process queue + cross-process file-lock pattern in
3+
// src/plugin-sdk/persistent-dedupe.ts (issue #81145).
4+
5+
import fs from "node:fs/promises";
6+
import path from "node:path";
7+
import { type FileLockOptions, withFileLock } from "../plugin-sdk/file-lock.js";
8+
9+
type CommitmentsStoreWriterTask = {
10+
fn: () => Promise<unknown>;
11+
resolve: (value: unknown) => void;
12+
reject: (reason: unknown) => void;
13+
};
14+
15+
type CommitmentsStoreWriterQueue = {
16+
running: boolean;
17+
pending: CommitmentsStoreWriterTask[];
18+
drainPromise: Promise<void> | null;
19+
};
20+
21+
const WRITER_QUEUES = new Map<string, CommitmentsStoreWriterQueue>();
22+
23+
// Matches src/plugin-sdk/persistent-dedupe.ts so both lock-protected stores share tuning.
24+
const DEFAULT_COMMITMENTS_LOCK_OPTIONS: FileLockOptions = {
25+
retries: {
26+
retries: 6,
27+
factor: 1.35,
28+
minTimeout: 8,
29+
maxTimeout: 180,
30+
randomize: true,
31+
},
32+
stale: 60_000,
33+
};
34+
35+
function getOrCreateWriterQueue(storePath: string): CommitmentsStoreWriterQueue {
36+
const existing = WRITER_QUEUES.get(storePath);
37+
if (existing) {
38+
return existing;
39+
}
40+
const created: CommitmentsStoreWriterQueue = {
41+
running: false,
42+
pending: [],
43+
drainPromise: null,
44+
};
45+
WRITER_QUEUES.set(storePath, created);
46+
return created;
47+
}
48+
49+
async function drainCommitmentsStoreWriterQueue(storePath: string): Promise<void> {
50+
const queue = WRITER_QUEUES.get(storePath);
51+
if (!queue) {
52+
return;
53+
}
54+
if (queue.drainPromise) {
55+
await queue.drainPromise;
56+
return;
57+
}
58+
queue.running = true;
59+
queue.drainPromise = (async () => {
60+
try {
61+
while (queue.pending.length > 0) {
62+
const task = queue.pending.shift();
63+
if (!task) {
64+
continue;
65+
}
66+
let result: unknown;
67+
let failed: unknown;
68+
let hasFailure = false;
69+
try {
70+
result = await task.fn();
71+
} catch (err) {
72+
hasFailure = true;
73+
failed = err;
74+
}
75+
if (hasFailure) {
76+
task.reject(failed);
77+
continue;
78+
}
79+
task.resolve(result);
80+
}
81+
} finally {
82+
queue.running = false;
83+
queue.drainPromise = null;
84+
if (queue.pending.length === 0) {
85+
WRITER_QUEUES.delete(storePath);
86+
} else {
87+
queueMicrotask(() => {
88+
void drainCommitmentsStoreWriterQueue(storePath);
89+
});
90+
}
91+
}
92+
})();
93+
await queue.drainPromise;
94+
}
95+
96+
// The advisory lockfile lives next to the data file; create the parent dir up
97+
// front so acquireFileLock does not ENOENT before the user fn ever runs.
98+
async function ensureCommitmentsStoreDir(storePath: string): Promise<void> {
99+
await fs.mkdir(path.dirname(storePath), { recursive: true });
100+
}
101+
102+
export async function runExclusiveCommitmentsStoreWrite<T>(
103+
storePath: string,
104+
fn: () => Promise<T>,
105+
): Promise<T> {
106+
if (!storePath || typeof storePath !== "string") {
107+
throw new Error(
108+
`runExclusiveCommitmentsStoreWrite: storePath must be a non-empty string, got ${JSON.stringify(
109+
storePath,
110+
)}`,
111+
);
112+
}
113+
const queue = getOrCreateWriterQueue(storePath);
114+
return await new Promise<T>((resolve, reject) => {
115+
const task: CommitmentsStoreWriterTask = {
116+
fn: async () => {
117+
await ensureCommitmentsStoreDir(storePath);
118+
return await withFileLock(storePath, DEFAULT_COMMITMENTS_LOCK_OPTIONS, fn);
119+
},
120+
resolve: (value) => resolve(value as T),
121+
reject,
122+
};
123+
queue.pending.push(task);
124+
void drainCommitmentsStoreWriterQueue(storePath);
125+
});
126+
}
127+
128+
export function clearCommitmentsStoreWriterQueuesForTest(): void {
129+
for (const queue of WRITER_QUEUES.values()) {
130+
for (const task of queue.pending) {
131+
task.reject(new Error("commitments store writer queue cleared for test"));
132+
}
133+
}
134+
WRITER_QUEUES.clear();
135+
}

src/commitments/store.test.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import {
77
listDueCommitmentsForSession,
88
listPendingCommitmentsForScope,
99
loadCommitmentStore,
10+
markCommitmentsAttempted,
11+
markCommitmentsStatus,
1012
saveCommitmentStore,
1113
} from "./store.js";
1214
import type { CommitmentRecord } from "./types.js";
@@ -232,4 +234,63 @@ describe("commitment store delivery selection", () => {
232234
expect(expiredCommitments[0]?.id).toBe("cm_interview");
233235
expect(expiredCommitments[0]?.status).toBe("expired");
234236
});
237+
238+
it("serializes concurrent markCommitmentsStatus on disjoint ids without losing a write (#13)", async () => {
239+
await useTempStateDir();
240+
await saveCommitmentStore(undefined, {
241+
version: 1,
242+
commitments: [
243+
commitment({ id: "cm_raceA", dedupeKey: "race-A" }),
244+
commitment({ id: "cm_raceB", dedupeKey: "race-B" }),
245+
],
246+
});
247+
248+
await Promise.all([
249+
markCommitmentsStatus({ ids: ["cm_raceA"], status: "dismissed", nowMs }),
250+
markCommitmentsStatus({ ids: ["cm_raceB"], status: "dismissed", nowMs }),
251+
]);
252+
253+
const after = await loadCommitmentStore();
254+
const byId = Object.fromEntries(after.commitments.map((c) => [c.id, c.status]));
255+
expect(byId.cm_raceA).toBe("dismissed");
256+
expect(byId.cm_raceB).toBe("dismissed");
257+
});
258+
259+
it("serializes concurrent markCommitmentsAttempted bumps without losing the counter (#13)", async () => {
260+
await useTempStateDir();
261+
await saveCommitmentStore(undefined, {
262+
version: 1,
263+
commitments: [commitment({ id: "cm_race_attempts", attempts: 0 })],
264+
});
265+
266+
await Promise.all(
267+
Array.from({ length: 5 }, () =>
268+
markCommitmentsAttempted({ ids: ["cm_race_attempts"], nowMs }),
269+
),
270+
);
271+
272+
const after = await loadCommitmentStore();
273+
expect(after.commitments[0]?.attempts).toBe(5);
274+
});
275+
276+
it("serializes a markCommitmentsStatus dismiss against a concurrent attempted bump (#13)", async () => {
277+
await useTempStateDir();
278+
await saveCommitmentStore(undefined, {
279+
version: 1,
280+
commitments: [
281+
commitment({ id: "cm_dismiss_target", dedupeKey: "dismiss-target" }),
282+
commitment({ id: "cm_attempt_target", dedupeKey: "attempt-target", attempts: 2 }),
283+
],
284+
});
285+
286+
await Promise.all([
287+
markCommitmentsStatus({ ids: ["cm_dismiss_target"], status: "dismissed", nowMs }),
288+
markCommitmentsAttempted({ ids: ["cm_attempt_target"], nowMs }),
289+
]);
290+
291+
const after = await loadCommitmentStore();
292+
const byId = Object.fromEntries(after.commitments.map((c) => [c.id, c]));
293+
expect(byId.cm_dismiss_target?.status).toBe("dismissed");
294+
expect(byId.cm_attempt_target?.attempts).toBe(3);
295+
});
235296
});

0 commit comments

Comments
 (0)