Skip to content

Swap out processors dynamically without interrupting the data flow #5327

@dinujoh

Description

@dinujoh

Is your feature request related to a problem? Please describe.
Currently, updating processors in a Data Prepper pipeline requires stopping and restarting the pipeline, which can lead to data loss or processing delays. We need a mechanism to update processor configurations or swap out processors dynamically without interrupting the data flow.

Describe the solution you'd like

Implement a system for hot-swapping processors and updating their configurations on-the-fly, ensuring continuous data processing without pipeline restarts.

  1. ProcessorRegistry
public class ProcessorRegistry {
    private volatile List<Processor> processors;

    public ProcessorRegistry(List<Processor> initialProcessors) {
        this.processors = new ArrayList<>(initialProcessors);
    }

    // Atomic swap of entire processor list
    public void swapProcessors(List<Processor> newProcessors) {
        Objects.requireNonNull(newProcessors, "New processors list cannot be null");
        this.processors = new ArrayList<>(newProcessors);
    }

    // Get current processors for execution
    public List<Processor> getProcessors() {
        return processors;
    }
}
  1. ProcessWorker class that uses the registry
public class ProcessWorker {
    private final ProcessorRegistry processorRegistry;
    private final Buffer readBuffer;
    // ... other fields

    private void doRun() {
        final Map.Entry<Collection, CheckpointState> readResult = readBuffer.read(pipeline.getReadBatchTimeoutInMillis());
        Collection records = readResult.getKey();
        final CheckpointState checkpointState = readResult.getValue();

        // Get current processor list from registry
        List<Processor> currentProcessors = processorRegistry.getProcessors();
        
        for (final Processor processor : currentProcessors) {
            List<Event> inputEvents = null;
            if (acknowledgementsEnabled) {
                inputEvents = ((List<Record<Event>>) records).stream()
                    .map(Record::getData)
                    .collect(Collectors.toList());
            }

            try {
                records = processor.execute(records);
                if (inputEvents != null) {
                    processAcknowledgements(inputEvents, records);
                }
            } catch (final Exception e) {
                LOG.error("Processor threw an exception. This batch of Events will be dropped.", e);
                if (inputEvents != null) {
                    processAcknowledgements(inputEvents, Collections.emptyList());
                }
                records = Collections.emptyList();
                break;
            }
        }

        postToSink(records);
        readBuffer.checkpoint(checkpointState);
    }
}
  1. Manager class to handle processor updates
public class PipelineManager {
    private final ProcessorRegistry processorRegistry;
    
    public void updateProcessors(List<Processor> newProcessors) {
        try {
            validateProcessors(newProcessors);
            processorRegistry.swapProcessors(newProcessors);
            LOG.info("Successfully updated processors");
        } catch (Exception e) {
            LOG.error("Failed to update processors", e);
            throw new ProcessorUpdateException("Failed to update processors", e);
        }
    }

    private void validateProcessors(List<Processor> processors) {
        if (processors == null || processors.isEmpty()) {
            throw new IllegalArgumentException("Processors list cannot be null or empty");
        }
        // Add any additional validation logic
    }
}

Describe alternatives you've considered (Optional)

  • Implementing a copy-on-write approach for the entire processor chain.
  • Using a message queue/buffering between processors to allow for more flexible updates.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

Status

Unplanned

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions