@@ -19,6 +19,35 @@ export type CronRunLogEntry = {
1919 nextRunAtMs ?: number ;
2020} & CronRunTelemetry ;
2121
22+ export type CronRunLogSortDir = "asc" | "desc" ;
23+ export type CronRunLogStatusFilter = "all" | "ok" | "error" | "skipped" ;
24+
25+ export type ReadCronRunLogPageOptions = {
26+ limit ?: number ;
27+ offset ?: number ;
28+ jobId ?: string ;
29+ status ?: CronRunLogStatusFilter ;
30+ statuses ?: CronRunStatus [ ] ;
31+ deliveryStatus ?: CronDeliveryStatus ;
32+ deliveryStatuses ?: CronDeliveryStatus [ ] ;
33+ query ?: string ;
34+ sortDir ?: CronRunLogSortDir ;
35+ } ;
36+
37+ export type CronRunLogPageResult = {
38+ entries : CronRunLogEntry [ ] ;
39+ total : number ;
40+ offset : number ;
41+ limit : number ;
42+ hasMore : boolean ;
43+ nextOffset : number | null ;
44+ } ;
45+
46+ type ReadCronRunLogAllPageOptions = Omit < ReadCronRunLogPageOptions , "jobId" > & {
47+ storePath : string ;
48+ jobNameById ?: Record < string , string > ;
49+ } ;
50+
2251function assertSafeCronRunLogJobId ( jobId : string ) : string {
2352 const trimmed = jobId . trim ( ) ;
2453 if ( ! trimmed ) {
@@ -98,14 +127,78 @@ export async function readCronRunLogEntries(
98127 opts ?: { limit ?: number ; jobId ?: string } ,
99128) : Promise < CronRunLogEntry [ ] > {
100129 const limit = Math . max ( 1 , Math . min ( 5000 , Math . floor ( opts ?. limit ?? 200 ) ) ) ;
130+ const page = await readCronRunLogEntriesPage ( filePath , {
131+ jobId : opts ?. jobId ,
132+ limit,
133+ offset : 0 ,
134+ status : "all" ,
135+ sortDir : "desc" ,
136+ } ) ;
137+ return page . entries . toReversed ( ) ;
138+ }
139+
140+ function normalizeRunStatusFilter ( status ?: string ) : CronRunLogStatusFilter {
141+ if ( status === "ok" || status === "error" || status === "skipped" || status === "all" ) {
142+ return status ;
143+ }
144+ return "all" ;
145+ }
146+
147+ function normalizeRunStatuses ( opts ?: {
148+ statuses ?: CronRunStatus [ ] ;
149+ status ?: CronRunLogStatusFilter ;
150+ } ) : CronRunStatus [ ] | null {
151+ if ( Array . isArray ( opts ?. statuses ) && opts . statuses . length > 0 ) {
152+ const filtered = opts . statuses . filter (
153+ ( status ) : status is CronRunStatus =>
154+ status === "ok" || status === "error" || status === "skipped" ,
155+ ) ;
156+ if ( filtered . length > 0 ) {
157+ return Array . from ( new Set ( filtered ) ) ;
158+ }
159+ }
160+ const status = normalizeRunStatusFilter ( opts ?. status ) ;
161+ if ( status === "all" ) {
162+ return null ;
163+ }
164+ return [ status ] ;
165+ }
166+
167+ function normalizeDeliveryStatuses ( opts ?: {
168+ deliveryStatuses ?: CronDeliveryStatus [ ] ;
169+ deliveryStatus ?: CronDeliveryStatus ;
170+ } ) : CronDeliveryStatus [ ] | null {
171+ if ( Array . isArray ( opts ?. deliveryStatuses ) && opts . deliveryStatuses . length > 0 ) {
172+ const filtered = opts . deliveryStatuses . filter (
173+ ( status ) : status is CronDeliveryStatus =>
174+ status === "delivered" ||
175+ status === "not-delivered" ||
176+ status === "unknown" ||
177+ status === "not-requested" ,
178+ ) ;
179+ if ( filtered . length > 0 ) {
180+ return Array . from ( new Set ( filtered ) ) ;
181+ }
182+ }
183+ if (
184+ opts ?. deliveryStatus === "delivered" ||
185+ opts ?. deliveryStatus === "not-delivered" ||
186+ opts ?. deliveryStatus === "unknown" ||
187+ opts ?. deliveryStatus === "not-requested"
188+ ) {
189+ return [ opts . deliveryStatus ] ;
190+ }
191+ return null ;
192+ }
193+
194+ function parseAllRunLogEntries ( raw : string , opts ?: { jobId ?: string } ) : CronRunLogEntry [ ] {
101195 const jobId = opts ?. jobId ?. trim ( ) || undefined ;
102- const raw = await fs . readFile ( path . resolve ( filePath ) , "utf-8" ) . catch ( ( ) => "" ) ;
103196 if ( ! raw . trim ( ) ) {
104197 return [ ] ;
105198 }
106199 const parsed : CronRunLogEntry [ ] = [ ] ;
107200 const lines = raw . split ( "\n" ) ;
108- for ( let i = lines . length - 1 ; i >= 0 && parsed . length < limit ; i -- ) {
201+ for ( let i = 0 ; i < lines . length ; i ++ ) {
109202 const line = lines [ i ] ?. trim ( ) ;
110203 if ( ! line ) {
111204 continue ;
@@ -182,5 +275,125 @@ export async function readCronRunLogEntries(
182275 // ignore invalid lines
183276 }
184277 }
185- return parsed . toReversed ( ) ;
278+ return parsed ;
279+ }
280+
281+ export async function readCronRunLogEntriesPage (
282+ filePath : string ,
283+ opts ?: ReadCronRunLogPageOptions ,
284+ ) : Promise < CronRunLogPageResult > {
285+ const limit = Math . max ( 1 , Math . min ( 200 , Math . floor ( opts ?. limit ?? 50 ) ) ) ;
286+ const raw = await fs . readFile ( path . resolve ( filePath ) , "utf-8" ) . catch ( ( ) => "" ) ;
287+ const statuses = normalizeRunStatuses ( opts ) ;
288+ const deliveryStatuses = normalizeDeliveryStatuses ( opts ) ;
289+ const query = opts ?. query ?. trim ( ) . toLowerCase ( ) ?? "" ;
290+ const sortDir : CronRunLogSortDir = opts ?. sortDir === "asc" ? "asc" : "desc" ;
291+ const all = parseAllRunLogEntries ( raw , { jobId : opts ?. jobId } ) ;
292+ const filtered = all . filter ( ( entry ) => {
293+ if ( statuses && ( ! entry . status || ! statuses . includes ( entry . status ) ) ) {
294+ return false ;
295+ }
296+ if ( deliveryStatuses ) {
297+ const deliveryStatus = entry . deliveryStatus ?? "not-requested" ;
298+ if ( ! deliveryStatuses . includes ( deliveryStatus ) ) {
299+ return false ;
300+ }
301+ }
302+ if ( ! query ) {
303+ return true ;
304+ }
305+ const haystack = [ entry . summary ?? "" , entry . error ?? "" , entry . jobId ] . join ( " " ) . toLowerCase ( ) ;
306+ return haystack . includes ( query ) ;
307+ } ) ;
308+ const sorted =
309+ sortDir === "asc"
310+ ? filtered . toSorted ( ( a , b ) => a . ts - b . ts )
311+ : filtered . toSorted ( ( a , b ) => b . ts - a . ts ) ;
312+ const total = sorted . length ;
313+ const offset = Math . max ( 0 , Math . min ( total , Math . floor ( opts ?. offset ?? 0 ) ) ) ;
314+ const entries = sorted . slice ( offset , offset + limit ) ;
315+ const nextOffset = offset + entries . length ;
316+ return {
317+ entries,
318+ total,
319+ offset,
320+ limit,
321+ hasMore : nextOffset < total ,
322+ nextOffset : nextOffset < total ? nextOffset : null ,
323+ } ;
324+ }
325+
326+ export async function readCronRunLogEntriesPageAll (
327+ opts : ReadCronRunLogAllPageOptions ,
328+ ) : Promise < CronRunLogPageResult > {
329+ const limit = Math . max ( 1 , Math . min ( 200 , Math . floor ( opts . limit ?? 50 ) ) ) ;
330+ const statuses = normalizeRunStatuses ( opts ) ;
331+ const deliveryStatuses = normalizeDeliveryStatuses ( opts ) ;
332+ const query = opts . query ?. trim ( ) . toLowerCase ( ) ?? "" ;
333+ const sortDir : CronRunLogSortDir = opts . sortDir === "asc" ? "asc" : "desc" ;
334+ const runsDir = path . resolve ( path . dirname ( path . resolve ( opts . storePath ) ) , "runs" ) ;
335+ const files = await fs . readdir ( runsDir , { withFileTypes : true } ) . catch ( ( ) => [ ] ) ;
336+ const jsonlFiles = files
337+ . filter ( ( entry ) => entry . isFile ( ) && entry . name . endsWith ( ".jsonl" ) )
338+ . map ( ( entry ) => path . join ( runsDir , entry . name ) ) ;
339+ if ( jsonlFiles . length === 0 ) {
340+ return {
341+ entries : [ ] ,
342+ total : 0 ,
343+ offset : 0 ,
344+ limit,
345+ hasMore : false ,
346+ nextOffset : null ,
347+ } ;
348+ }
349+ const chunks = await Promise . all (
350+ jsonlFiles . map ( async ( filePath ) => {
351+ const raw = await fs . readFile ( filePath , "utf-8" ) . catch ( ( ) => "" ) ;
352+ return parseAllRunLogEntries ( raw ) ;
353+ } ) ,
354+ ) ;
355+ const all = chunks . flat ( ) ;
356+ const filtered = all . filter ( ( entry ) => {
357+ if ( statuses && ( ! entry . status || ! statuses . includes ( entry . status ) ) ) {
358+ return false ;
359+ }
360+ if ( deliveryStatuses ) {
361+ const deliveryStatus = entry . deliveryStatus ?? "not-requested" ;
362+ if ( ! deliveryStatuses . includes ( deliveryStatus ) ) {
363+ return false ;
364+ }
365+ }
366+ if ( ! query ) {
367+ return true ;
368+ }
369+ const jobName = opts . jobNameById ?. [ entry . jobId ] ?? "" ;
370+ const haystack = [ entry . summary ?? "" , entry . error ?? "" , entry . jobId , jobName ]
371+ . join ( " " )
372+ . toLowerCase ( ) ;
373+ return haystack . includes ( query ) ;
374+ } ) ;
375+ const sorted =
376+ sortDir === "asc"
377+ ? filtered . toSorted ( ( a , b ) => a . ts - b . ts )
378+ : filtered . toSorted ( ( a , b ) => b . ts - a . ts ) ;
379+ const total = sorted . length ;
380+ const offset = Math . max ( 0 , Math . min ( total , Math . floor ( opts . offset ?? 0 ) ) ) ;
381+ const entries = sorted . slice ( offset , offset + limit ) ;
382+ if ( opts . jobNameById ) {
383+ for ( const entry of entries ) {
384+ const jobName = opts . jobNameById [ entry . jobId ] ;
385+ if ( jobName ) {
386+ ( entry as CronRunLogEntry & { jobName ?: string } ) . jobName = jobName ;
387+ }
388+ }
389+ }
390+ const nextOffset = offset + entries . length ;
391+ return {
392+ entries,
393+ total,
394+ offset,
395+ limit,
396+ hasMore : nextOffset < total ,
397+ nextOffset : nextOffset < total ? nextOffset : null ,
398+ } ;
186399}
0 commit comments