Skip to content

Commit e1599ee

Browse files
committed
fix: trace2 watcher basename matching and concurrent flush/watcher race
Bug 1: fs.watch provides basename-only paths on most platforms, so the comparison against the full absolute traceFilePath never matched. Now compares using path.basename() so the watcher correctly triggers readTraceDelta for real-time hook progress streaming. Bug 2: readTraceDelta is shared between the forked watcher fiber and the flush call. Both could enter concurrently at the async readFileString boundary, causing duplicate line processing. Wraps readTraceDelta with a Semaphore(1) mutex to serialize access to processedChars and lineBuffer.
1 parent 7984dc0 commit e1599ee

File tree

1 file changed

+26
-19
lines changed

1 file changed

+26
-19
lines changed

apps/server/src/git/Layers/GitCore.ts

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
Result,
1313
Schema,
1414
Scope,
15+
Semaphore,
1516
Stream,
1617
} from "effect";
1718
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";
@@ -361,30 +362,36 @@ const createTrace2Monitor = Effect.fn(function* (
361362
}
362363
});
363364

364-
const readTraceDelta = fs.readFileString(traceFilePath).pipe(
365-
Effect.catch(() => Effect.succeed("")),
366-
Effect.flatMap((delta) =>
367-
Effect.gen(function* () {
368-
if (delta.length <= processedChars) {
369-
return;
370-
}
371-
const appended = delta.slice(processedChars);
372-
processedChars = delta.length;
373-
lineBuffer += appended;
374-
let newlineIndex = lineBuffer.indexOf("\n");
375-
while (newlineIndex >= 0) {
376-
const line = lineBuffer.slice(0, newlineIndex);
377-
lineBuffer = lineBuffer.slice(newlineIndex + 1);
378-
yield* handleTraceLine(line);
379-
newlineIndex = lineBuffer.indexOf("\n");
380-
}
381-
}),
365+
const deltaMutex = Semaphore.makeUnsafe(1);
366+
const readTraceDelta = deltaMutex.withPermit(
367+
fs.readFileString(traceFilePath).pipe(
368+
Effect.catch(() => Effect.succeed("")),
369+
Effect.flatMap((delta) =>
370+
Effect.gen(function* () {
371+
if (delta.length <= processedChars) {
372+
return;
373+
}
374+
const appended = delta.slice(processedChars);
375+
processedChars = delta.length;
376+
lineBuffer += appended;
377+
let newlineIndex = lineBuffer.indexOf("\n");
378+
while (newlineIndex >= 0) {
379+
const line = lineBuffer.slice(0, newlineIndex);
380+
lineBuffer = lineBuffer.slice(newlineIndex + 1);
381+
yield* handleTraceLine(line);
382+
newlineIndex = lineBuffer.indexOf("\n");
383+
}
384+
}),
385+
),
382386
),
383387
);
388+
const traceFileName = path.basename(traceFilePath);
384389
const watchTraceFile = Stream.runForEach(fs.watch(traceFilePath), (event) => {
385390
const eventPath = event.path;
386391
const isTargetTraceEvent =
387-
eventPath === traceFilePath || path.resolve(eventPath) === traceFilePath;
392+
eventPath === traceFilePath ||
393+
eventPath === traceFileName ||
394+
path.basename(eventPath) === traceFileName;
388395
if (!isTargetTraceEvent) return Effect.void;
389396
return readTraceDelta;
390397
}).pipe(Effect.ignoreCause({ log: true }));

0 commit comments

Comments
 (0)