1+ import { randomUUID } from "node:crypto" ;
12import fs from "node:fs/promises" ;
23import path from "node:path" ;
3- import { SessionManager } from "@mariozechner/pi-coding-agent" ;
4+ import { StringDecoder } from "node:string_decoder" ;
5+ import { CURRENT_SESSION_VERSION , type SessionManager } from "@mariozechner/pi-coding-agent" ;
46import {
57 acquireSessionWriteLock ,
68 emitSessionTranscriptUpdate ,
79 runAgentHarnessBeforeMessageWriteHook ,
810 type AgentMessage ,
911} from "openclaw/plugin-sdk/agent-harness-runtime" ;
1012
13+ const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024 ;
14+ const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024 ;
15+
16+ type TranscriptLeafInfo = {
17+ leafId ?: string ;
18+ hasParentLinkedEntries : boolean ;
19+ nonSessionEntryCount : number ;
20+ } ;
21+
1122export async function mirrorCodexAppServerTranscript ( params : {
1223 sessionFile : string ;
1324 sessionKey ?: string ;
@@ -29,7 +40,6 @@ export async function mirrorCodexAppServerTranscript(params: {
2940 } ) ;
3041 try {
3142 const existingIdempotencyKeys = await readTranscriptIdempotencyKeys ( params . sessionFile ) ;
32- const sessionManager = SessionManager . open ( params . sessionFile ) ;
3343 for ( const [ index , message ] of messages . entries ( ) ) {
3444 const idempotencyKey = params . idempotencyScope
3545 ? `${ params . idempotencyScope } :${ message . role } :${ index } `
@@ -55,7 +65,10 @@ export async function mirrorCodexAppServerTranscript(params: {
5565 idempotencyKey,
5666 }
5767 : nextMessage ) as unknown as Parameters < SessionManager [ "appendMessage" ] > [ 0 ] ;
58- sessionManager . appendMessage ( messageToAppend ) ;
68+ await appendCodexAppServerTranscriptMessage ( {
69+ transcriptPath : params . sessionFile ,
70+ message : messageToAppend ,
71+ } ) ;
5972 if ( idempotencyKey ) {
6073 existingIdempotencyKeys . add ( idempotencyKey ) ;
6174 }
@@ -71,6 +84,202 @@ export async function mirrorCodexAppServerTranscript(params: {
7184 }
7285}
7386
87+ async function appendCodexAppServerTranscriptMessage ( params : {
88+ transcriptPath : string ;
89+ message : unknown ;
90+ } ) : Promise < void > {
91+ await ensureTranscriptHeader ( params . transcriptPath ) ;
92+ const stat = await fs . stat ( params . transcriptPath ) . catch ( ( ) => null ) ;
93+ let leafInfo : TranscriptLeafInfo = await readTranscriptLeafInfo ( params . transcriptPath ) . catch (
94+ ( ) => ( {
95+ hasParentLinkedEntries : false ,
96+ nonSessionEntryCount : 0 ,
97+ } ) ,
98+ ) ;
99+ const hasLinearEntries = ! leafInfo . hasParentLinkedEntries && leafInfo . nonSessionEntryCount > 0 ;
100+ const shouldRawAppend = hasLinearEntries && ( stat ?. size ?? 0 ) > SESSION_MANAGER_APPEND_MAX_BYTES ;
101+ if ( hasLinearEntries && ! shouldRawAppend ) {
102+ const migrated = await migrateLinearTranscriptToParentLinked ( params . transcriptPath ) ;
103+ leafInfo = {
104+ ...( migrated . leafId ? { leafId : migrated . leafId } : { } ) ,
105+ hasParentLinkedEntries : Boolean ( migrated . leafId ) ,
106+ nonSessionEntryCount : leafInfo . nonSessionEntryCount ,
107+ } ;
108+ }
109+ const entry = {
110+ type : "message" ,
111+ id : randomUUID ( ) ,
112+ ...( shouldRawAppend ? { } : { parentId : leafInfo . leafId ?? null } ) ,
113+ timestamp : new Date ( ) . toISOString ( ) ,
114+ message : params . message ,
115+ } ;
116+ await fs . appendFile ( params . transcriptPath , `${ JSON . stringify ( entry ) } \n` , "utf-8" ) ;
117+ }
118+
119+ async function ensureTranscriptHeader ( transcriptPath : string ) : Promise < void > {
120+ const stat = await fs . stat ( transcriptPath ) . catch ( ( ) => null ) ;
121+ if ( stat ?. isFile ( ) && stat . size > 0 ) {
122+ return ;
123+ }
124+ await fs . mkdir ( path . dirname ( transcriptPath ) , { recursive : true } ) ;
125+ const header = {
126+ type : "session" ,
127+ version : CURRENT_SESSION_VERSION ,
128+ id : randomUUID ( ) ,
129+ timestamp : new Date ( ) . toISOString ( ) ,
130+ cwd : process . cwd ( ) ,
131+ } ;
132+ await fs . writeFile ( transcriptPath , `${ JSON . stringify ( header ) } \n` , {
133+ encoding : "utf-8" ,
134+ mode : 0o600 ,
135+ flag : stat ?. isFile ( ) ? "w" : "wx" ,
136+ } ) ;
137+ }
138+
139+ async function readTranscriptLeafInfo ( transcriptPath : string ) : Promise < TranscriptLeafInfo > {
140+ const handle = await fs . open ( transcriptPath , "r" ) ;
141+ try {
142+ const decoder = new StringDecoder ( "utf8" ) ;
143+ const buffer = Buffer . allocUnsafe ( TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES ) ;
144+ let carry = "" ;
145+ let leafId : string | undefined ;
146+ let hasParentLinkedEntries = false ;
147+ let nonSessionEntryCount = 0 ;
148+ while ( true ) {
149+ const { bytesRead } = await handle . read ( buffer , 0 , buffer . length , null ) ;
150+ if ( bytesRead <= 0 ) {
151+ break ;
152+ }
153+ const text = carry + decoder . write ( buffer . subarray ( 0 , bytesRead ) ) ;
154+ const lines = text . split ( / \r ? \n / ) ;
155+ carry = lines . pop ( ) ?? "" ;
156+ for ( const line of lines ) {
157+ if ( lineHasNonSessionEntry ( line ) ) {
158+ nonSessionEntryCount += 1 ;
159+ }
160+ const id = lineParentLinkedEntryId ( line ) ;
161+ if ( id ) {
162+ leafId = id ;
163+ hasParentLinkedEntries = true ;
164+ }
165+ }
166+ await yieldTranscriptAppendScan ( ) ;
167+ }
168+ const tail = carry + decoder . end ( ) ;
169+ if ( lineHasNonSessionEntry ( tail ) ) {
170+ nonSessionEntryCount += 1 ;
171+ }
172+ const id = lineParentLinkedEntryId ( tail ) ;
173+ if ( id ) {
174+ leafId = id ;
175+ hasParentLinkedEntries = true ;
176+ }
177+ return {
178+ ...( leafId ? { leafId } : { } ) ,
179+ hasParentLinkedEntries,
180+ nonSessionEntryCount,
181+ } ;
182+ } finally {
183+ await handle . close ( ) ;
184+ }
185+ }
186+
187+ async function migrateLinearTranscriptToParentLinked ( transcriptPath : string ) : Promise < {
188+ leafId ?: string ;
189+ } > {
190+ const raw = await fs . readFile ( transcriptPath , "utf-8" ) ;
191+ const existingIds = new Set < string > ( ) ;
192+ const output : string [ ] = [ ] ;
193+ let previousId : string | null = null ;
194+ let leafId : string | undefined ;
195+ for ( const line of raw . split ( / \r ? \n / ) ) {
196+ if ( ! line . trim ( ) ) {
197+ continue ;
198+ }
199+ let parsed : unknown ;
200+ try {
201+ parsed = JSON . parse ( line ) ;
202+ } catch {
203+ output . push ( line ) ;
204+ continue ;
205+ }
206+ if ( ! parsed || typeof parsed !== "object" || Array . isArray ( parsed ) ) {
207+ output . push ( line ) ;
208+ continue ;
209+ }
210+ const record = parsed as Record < string , unknown > ;
211+ if ( record . type === "session" ) {
212+ output . push ( JSON . stringify ( { ...record , version : CURRENT_SESSION_VERSION } ) ) ;
213+ continue ;
214+ }
215+ const id = normalizeEntryId ( record . id ) ?? generateEntryId ( existingIds ) ;
216+ existingIds . add ( id ) ;
217+ record . id = id ;
218+ if ( ! Object . hasOwn ( record , "parentId" ) ) {
219+ record . parentId = previousId ;
220+ }
221+ previousId = id ;
222+ leafId = id ;
223+ output . push ( JSON . stringify ( record ) ) ;
224+ }
225+ await fs . writeFile ( transcriptPath , `${ output . join ( "\n" ) } \n` , {
226+ encoding : "utf-8" ,
227+ mode : 0o600 ,
228+ } ) ;
229+ const result : { leafId ?: string } = { } ;
230+ if ( leafId ) {
231+ result . leafId = leafId ;
232+ }
233+ return result ;
234+ }
235+
236+ function normalizeEntryId ( value : unknown ) : string | undefined {
237+ return typeof value === "string" && value . trim ( ) . length > 0 ? value : undefined ;
238+ }
239+
240+ function generateEntryId ( existingIds : Set < string > ) : string {
241+ for ( let attempt = 0 ; attempt < 100 ; attempt += 1 ) {
242+ const id = randomUUID ( ) . slice ( 0 , 8 ) ;
243+ if ( ! existingIds . has ( id ) ) {
244+ existingIds . add ( id ) ;
245+ return id ;
246+ }
247+ }
248+ const id = randomUUID ( ) ;
249+ existingIds . add ( id ) ;
250+ return id ;
251+ }
252+
253+ function lineHasNonSessionEntry ( line : string ) : boolean {
254+ if ( ! line . trim ( ) ) {
255+ return false ;
256+ }
257+ try {
258+ const parsed = JSON . parse ( line ) as { type ?: unknown } ;
259+ return parsed . type !== "session" ;
260+ } catch {
261+ return false ;
262+ }
263+ }
264+
265+ function lineParentLinkedEntryId ( line : string ) : string | undefined {
266+ if ( ! line . trim ( ) ) {
267+ return undefined ;
268+ }
269+ try {
270+ const parsed = JSON . parse ( line ) as { type ?: unknown ; id ?: unknown ; parentId ?: unknown } ;
271+ return parsed . type !== "session" && typeof parsed . id === "string" && "parentId" in parsed
272+ ? parsed . id
273+ : undefined ;
274+ } catch {
275+ return undefined ;
276+ }
277+ }
278+
279+ async function yieldTranscriptAppendScan ( ) : Promise < void > {
280+ await new Promise < void > ( ( resolve ) => setImmediate ( resolve ) ) ;
281+ }
282+
74283async function readTranscriptIdempotencyKeys ( sessionFile : string ) : Promise < Set < string > > {
75284 const keys = new Set < string > ( ) ;
76285 let raw : string ;
0 commit comments