|
| 1 | +// Per-store-path mutation gate for the commitments store. Mirrors the |
| 2 | +// in-process queue + cross-process file-lock pattern in |
| 3 | +// src/plugin-sdk/persistent-dedupe.ts (issue #81145). |
| 4 | + |
| 5 | +import fs from "node:fs/promises"; |
| 6 | +import path from "node:path"; |
| 7 | +import { type FileLockOptions, withFileLock } from "../plugin-sdk/file-lock.js"; |
| 8 | + |
| 9 | +type CommitmentsStoreWriterTask = { |
| 10 | + fn: () => Promise<unknown>; |
| 11 | + resolve: (value: unknown) => void; |
| 12 | + reject: (reason: unknown) => void; |
| 13 | +}; |
| 14 | + |
| 15 | +type CommitmentsStoreWriterQueue = { |
| 16 | + running: boolean; |
| 17 | + pending: CommitmentsStoreWriterTask[]; |
| 18 | + drainPromise: Promise<void> | null; |
| 19 | +}; |
| 20 | + |
| 21 | +const WRITER_QUEUES = new Map<string, CommitmentsStoreWriterQueue>(); |
| 22 | + |
| 23 | +// Matches src/plugin-sdk/persistent-dedupe.ts so both lock-protected stores share tuning. |
| 24 | +const DEFAULT_COMMITMENTS_LOCK_OPTIONS: FileLockOptions = { |
| 25 | + retries: { |
| 26 | + retries: 6, |
| 27 | + factor: 1.35, |
| 28 | + minTimeout: 8, |
| 29 | + maxTimeout: 180, |
| 30 | + randomize: true, |
| 31 | + }, |
| 32 | + stale: 60_000, |
| 33 | +}; |
| 34 | + |
| 35 | +function getOrCreateWriterQueue(storePath: string): CommitmentsStoreWriterQueue { |
| 36 | + const existing = WRITER_QUEUES.get(storePath); |
| 37 | + if (existing) { |
| 38 | + return existing; |
| 39 | + } |
| 40 | + const created: CommitmentsStoreWriterQueue = { |
| 41 | + running: false, |
| 42 | + pending: [], |
| 43 | + drainPromise: null, |
| 44 | + }; |
| 45 | + WRITER_QUEUES.set(storePath, created); |
| 46 | + return created; |
| 47 | +} |
| 48 | + |
| 49 | +async function drainCommitmentsStoreWriterQueue(storePath: string): Promise<void> { |
| 50 | + const queue = WRITER_QUEUES.get(storePath); |
| 51 | + if (!queue) { |
| 52 | + return; |
| 53 | + } |
| 54 | + if (queue.drainPromise) { |
| 55 | + await queue.drainPromise; |
| 56 | + return; |
| 57 | + } |
| 58 | + queue.running = true; |
| 59 | + queue.drainPromise = (async () => { |
| 60 | + try { |
| 61 | + while (queue.pending.length > 0) { |
| 62 | + const task = queue.pending.shift(); |
| 63 | + if (!task) { |
| 64 | + continue; |
| 65 | + } |
| 66 | + let result: unknown; |
| 67 | + let failed: unknown; |
| 68 | + let hasFailure = false; |
| 69 | + try { |
| 70 | + result = await task.fn(); |
| 71 | + } catch (err) { |
| 72 | + hasFailure = true; |
| 73 | + failed = err; |
| 74 | + } |
| 75 | + if (hasFailure) { |
| 76 | + task.reject(failed); |
| 77 | + continue; |
| 78 | + } |
| 79 | + task.resolve(result); |
| 80 | + } |
| 81 | + } finally { |
| 82 | + queue.running = false; |
| 83 | + queue.drainPromise = null; |
| 84 | + if (queue.pending.length === 0) { |
| 85 | + WRITER_QUEUES.delete(storePath); |
| 86 | + } else { |
| 87 | + queueMicrotask(() => { |
| 88 | + void drainCommitmentsStoreWriterQueue(storePath); |
| 89 | + }); |
| 90 | + } |
| 91 | + } |
| 92 | + })(); |
| 93 | + await queue.drainPromise; |
| 94 | +} |
| 95 | + |
| 96 | +// The advisory lockfile lives next to the data file; create the parent dir up |
| 97 | +// front so acquireFileLock does not ENOENT before the user fn ever runs. |
| 98 | +async function ensureCommitmentsStoreDir(storePath: string): Promise<void> { |
| 99 | + await fs.mkdir(path.dirname(storePath), { recursive: true }); |
| 100 | +} |
| 101 | + |
| 102 | +export async function runExclusiveCommitmentsStoreWrite<T>( |
| 103 | + storePath: string, |
| 104 | + fn: () => Promise<T>, |
| 105 | +): Promise<T> { |
| 106 | + if (!storePath || typeof storePath !== "string") { |
| 107 | + throw new Error( |
| 108 | + `runExclusiveCommitmentsStoreWrite: storePath must be a non-empty string, got ${JSON.stringify( |
| 109 | + storePath, |
| 110 | + )}`, |
| 111 | + ); |
| 112 | + } |
| 113 | + const queue = getOrCreateWriterQueue(storePath); |
| 114 | + return await new Promise<T>((resolve, reject) => { |
| 115 | + const task: CommitmentsStoreWriterTask = { |
| 116 | + fn: async () => { |
| 117 | + await ensureCommitmentsStoreDir(storePath); |
| 118 | + return await withFileLock(storePath, DEFAULT_COMMITMENTS_LOCK_OPTIONS, fn); |
| 119 | + }, |
| 120 | + resolve: (value) => resolve(value as T), |
| 121 | + reject, |
| 122 | + }; |
| 123 | + queue.pending.push(task); |
| 124 | + void drainCommitmentsStoreWriterQueue(storePath); |
| 125 | + }); |
| 126 | +} |
| 127 | + |
| 128 | +export function clearCommitmentsStoreWriterQueuesForTest(): void { |
| 129 | + for (const queue of WRITER_QUEUES.values()) { |
| 130 | + for (const task of queue.pending) { |
| 131 | + task.reject(new Error("commitments store writer queue cleared for test")); |
| 132 | + } |
| 133 | + } |
| 134 | + WRITER_QUEUES.clear(); |
| 135 | +} |
0 commit comments