-
Notifications
You must be signed in to change notification settings - Fork 313
Swap out processors dynamically without interrupting the data flow #5327
Copy link
Copy link
Open
Labels
enhancementNew feature or requestNew feature or request
Description
Is your feature request related to a problem? Please describe.
Currently, updating processors in a Data Prepper pipeline requires stopping and restarting the pipeline, which can lead to data loss or processing delays. We need a mechanism to update processor configurations or swap out processors dynamically without interrupting the data flow.
Describe the solution you'd like
Implement a system for hot-swapping processors and updating their configurations on-the-fly, ensuring continuous data processing without pipeline restarts.
- ProcessorRegistry
public class ProcessorRegistry {
private volatile List<Processor> processors;
public ProcessorRegistry(List<Processor> initialProcessors) {
this.processors = new ArrayList<>(initialProcessors);
}
// Atomic swap of entire processor list
public void swapProcessors(List<Processor> newProcessors) {
Objects.requireNonNull(newProcessors, "New processors list cannot be null");
this.processors = new ArrayList<>(newProcessors);
}
// Get current processors for execution
public List<Processor> getProcessors() {
return processors;
}
}
- ProcessWorker class that uses the registry
public class ProcessWorker {
private final ProcessorRegistry processorRegistry;
private final Buffer readBuffer;
// ... other fields
private void doRun() {
final Map.Entry<Collection, CheckpointState> readResult = readBuffer.read(pipeline.getReadBatchTimeoutInMillis());
Collection records = readResult.getKey();
final CheckpointState checkpointState = readResult.getValue();
// Get current processor list from registry
List<Processor> currentProcessors = processorRegistry.getProcessors();
for (final Processor processor : currentProcessors) {
List<Event> inputEvents = null;
if (acknowledgementsEnabled) {
inputEvents = ((List<Record<Event>>) records).stream()
.map(Record::getData)
.collect(Collectors.toList());
}
try {
records = processor.execute(records);
if (inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
LOG.error("Processor threw an exception. This batch of Events will be dropped.", e);
if (inputEvents != null) {
processAcknowledgements(inputEvents, Collections.emptyList());
}
records = Collections.emptyList();
break;
}
}
postToSink(records);
readBuffer.checkpoint(checkpointState);
}
}
- Manager class to handle processor updates
public class PipelineManager {
private final ProcessorRegistry processorRegistry;
public void updateProcessors(List<Processor> newProcessors) {
try {
validateProcessors(newProcessors);
processorRegistry.swapProcessors(newProcessors);
LOG.info("Successfully updated processors");
} catch (Exception e) {
LOG.error("Failed to update processors", e);
throw new ProcessorUpdateException("Failed to update processors", e);
}
}
private void validateProcessors(List<Processor> processors) {
if (processors == null || processors.isEmpty()) {
throw new IllegalArgumentException("Processors list cannot be null or empty");
}
// Add any additional validation logic
}
}
Describe alternatives you've considered (Optional)
- Implementing a copy-on-write approach for the entire processor chain.
- Using a message queue/buffering between processors to allow for more flexible updates.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request
Type
Projects
Status
Unplanned