Skip to content

Commit 7b5c7e1

Browse files
committed
refactor(processor): migrate processWithFilters to runFilterGraph
- Replace inline frame loop with runFilterGraph call - Move level calculation and output measurement into OnFrame callback - Move frame counting and progress reporting into OnInputFrame callback - Remove manual filteredFrame allocation (now managed by runFilterGraph) - Remove unused errors import - Configure strict error handling with read error propagation Eliminates ~100 lines of duplicated frame loop code. Third migration of six across the codebase. Maintains exact error handling semantics and callback ordering. Signed-off-by: Martin Wimpress <code@wimpress.io>
1 parent 104d013 commit 7b5c7e1

1 file changed

Lines changed: 21 additions & 71 deletions

File tree

internal/processor/processor.go

Lines changed: 21 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
package processor
33

44
import (
5-
"errors"
65
"fmt"
76
"path/filepath"
87
"strings"
@@ -203,10 +202,6 @@ func processWithFilters(inputPath, outputPath string, config *FilterChainConfig,
203202
}
204203
defer encoder.Close()
205204

206-
// Allocate frames for processing
207-
filteredFrame := ffmpeg.AVFrameAlloc()
208-
defer ffmpeg.AVFrameFree(&filteredFrame)
209-
210205
// Initialize output measurement accumulators if output analysis is enabled
211206
var outputAcc *outputMetadataAccumulators
212207
if config.OutputAnalysisEnabled && outputMeasurements != nil {
@@ -217,33 +212,25 @@ func processWithFilters(inputPath, outputPath string, config *FilterChainConfig,
217212
frameCount := 0
218213
currentLevel := 0.0
219214

220-
// Process all frames through the filter chain
221-
for {
222-
// Read frame from input
223-
frame, err := reader.ReadFrame()
224-
if err != nil {
215+
// Process all frames through the filter chain using runFilterGraph
216+
if err := runFilterGraph(reader, bufferSrcCtx, bufferSinkCtx, FrameLoopConfig{
217+
OnReadError: func(err error) error {
225218
return fmt.Errorf("failed to read frame: %w", err)
226-
}
227-
if frame == nil {
228-
break // EOF
229-
}
230-
231-
frameCount++
232-
233-
// Push frame into filter graph
234-
if _, err := ffmpeg.AVBuffersrcAddFrameFlags(bufferSrcCtx, frame, 0); err != nil {
235-
return fmt.Errorf("failed to add frame to filter: %w", err)
236-
}
237-
238-
// Pull filtered frames and encode them
239-
for {
240-
if _, err := ffmpeg.AVBuffersinkGetFrame(bufferSinkCtx, filteredFrame); err != nil {
241-
if errors.Is(err, ffmpeg.EAgain) || errors.Is(err, ffmpeg.AVErrorEOF) {
242-
break
219+
},
220+
OnInputFrame: func(inputFrame *ffmpeg.AVFrame) {
221+
frameCount++
222+
223+
// Send periodic progress updates based on INPUT frame count
224+
updateInterval := 100
225+
if frameCount%updateInterval == 0 && progressCallback != nil && estimatedTotalFrames > 0 {
226+
progress := float64(frameCount) / estimatedTotalFrames
227+
if progress > 1.0 {
228+
progress = 1.0
243229
}
244-
return fmt.Errorf("failed to get filtered frame: %w", err)
230+
progressCallback(PassProcessing, "Processing", progress, currentLevel, measurements)
245231
}
246-
232+
},
233+
OnFrame: func(inputFrame, filteredFrame *ffmpeg.AVFrame) (FrameAction, error) {
247234
// Calculate audio level from FILTERED frame (shows processed audio in VU meter)
248235
currentLevel = calculateFrameLevel(filteredFrame)
249236

@@ -257,50 +244,13 @@ func processWithFilters(inputPath, outputPath string, config *FilterChainConfig,
257244

258245
// Encode and write the filtered frame
259246
if err := encoder.WriteFrame(filteredFrame); err != nil {
260-
return fmt.Errorf("failed to write frame: %w", err)
261-
}
262-
263-
ffmpeg.AVFrameUnref(filteredFrame)
264-
}
265-
266-
// Send periodic progress updates based on INPUT frame count (moved outside inner loop)
267-
// This ensures consistent update frequency between Pass 1 and Pass 2
268-
updateInterval := 100 // Send progress update every N frames
269-
if frameCount%updateInterval == 0 && progressCallback != nil && estimatedTotalFrames > 0 {
270-
progress := float64(frameCount) / estimatedTotalFrames
271-
if progress > 1.0 {
272-
progress = 1.0
247+
return FrameDiscard, fmt.Errorf("failed to write frame: %w", err)
273248
}
274-
progressCallback(PassProcessing, "Processing", progress, currentLevel, measurements)
275-
}
276-
}
277-
278-
// Flush the filter graph
279-
if _, err := ffmpeg.AVBuffersrcAddFrameFlags(bufferSrcCtx, nil, 0); err != nil {
280-
return fmt.Errorf("failed to flush filter: %w", err)
281-
}
282-
283-
// Pull remaining filtered frames
284-
for {
285-
if _, err := ffmpeg.AVBuffersinkGetFrame(bufferSinkCtx, filteredFrame); err != nil {
286-
if errors.Is(err, ffmpeg.EAgain) || errors.Is(err, ffmpeg.AVErrorEOF) {
287-
break
288-
}
289-
return fmt.Errorf("failed to get filtered frame: %w", err)
290-
}
291-
292-
// Extract output measurements from remaining frames
293-
if outputAcc != nil {
294-
extractOutputFrameMetadata(filteredFrame.Metadata(), outputAcc)
295-
}
296-
297-
filteredFrame.SetTimeBase(ffmpeg.AVBuffersinkGetTimeBase(bufferSinkCtx))
298-
299-
if err := encoder.WriteFrame(filteredFrame); err != nil {
300-
return fmt.Errorf("failed to write frame: %w", err)
301-
}
302249

303-
ffmpeg.AVFrameUnref(filteredFrame)
250+
return FrameDiscard, nil
251+
},
252+
}); err != nil {
253+
return err
304254
}
305255

306256
// Flush the encoder

0 commit comments

Comments
 (0)