Hello Spring Batch Team
I found an issue regarding the new Local Chunking introduced in version 6.0.
Bug description
When calling a method on a @StepScoped Bean inside a ChunkProcessor a ScopeNotActiveException is thrown. There is also an issue for Beans that implement org.springframework.beans.factory.DisposableBean. Spring will try to call the destroy method but will fail for the same reason. This happens silently and no stacktrace is printed.
Environment
Spring Batch version: 6.0.3.
Steps to reproduce
I have created a test configuration and test case that can be copied into the samples. The test contains 2 tests:
- One with a fix
- One that shows the bug
The fix is similar to #5183
package org.springframework.batch.samples.chunking.local;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.StepSynchronizationManager;
import org.springframework.batch.core.step.Step;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.item.ChunkProcessor;
import org.springframework.batch.infrastructure.item.ItemReader;
import org.springframework.batch.integration.chunk.ChunkTaskExecutorItemWriter;
import org.springframework.batch.samples.common.DataSourceConfiguration;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.support.ScopeNotActiveException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.jdbc.support.JdbcTransactionManager;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.Iterator;
import java.util.stream.IntStream;
@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class)
public class LocalChunkingStepScopeJobConfiguration {
@Bean
public Job job(JobRepository jobRepository, Step chunkingStep) {
return new JobBuilder("job", jobRepository).start(chunkingStep).build();
}
@Bean
public Step chunkingStep(JobRepository jobRepository,
JdbcTransactionManager transactionManager,
ItemReader<Integer> itemReader,
ChunkTaskExecutorItemWriter<Integer> itemWriter) {
return new StepBuilder("chunkingStep", jobRepository).<Integer, Integer>chunk(2)
.transactionManager(transactionManager)
.reader(itemReader)
.writer(itemWriter)
.build();
}
@Bean
public ItemReader<Integer> itemReader() {
return new ItemReader<>() {
private final Iterator<Integer> data = IntStream.range(0, 10).boxed().toList().iterator();
@Override
public Integer read() {
if (data.hasNext()) {
return data.next();
} else {
return null;
}
}
};
}
@Bean
@StepScope
public ChunkTaskExecutorItemWriter<Integer> itemWriter(ChunkProcessor<Integer> chunkProcessor,
@Value("#{jobParameters['enableFix']}") boolean enableFix) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setThreadNamePrefix("worker-thread-");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.afterPropertiesSet();
if (enableFix) {
return new ChunkTaskExecutorItemWriter<>((chunk, c) -> {
// This is the fix similar to https://github.com/spring-projects/spring-batch/issues/5183
StepSynchronizationManager.register(c.getStepExecution());
try {
chunkProcessor.process(chunk, c);
} finally {
StepSynchronizationManager.close();
}
}, taskExecutor);
} else {
return new ChunkTaskExecutorItemWriter<>(((chunk, contribution) -> {
chunkProcessor.process(chunk, contribution);
}), taskExecutor);
}
}
@Bean
public ChunkProcessor<Integer> chunkProcessor(@Qualifier("MyDisposableBean") DisposableBean bean) {
return (chunk, c) -> {
try {
System.out.println("Call bean.toString() " + bean.toString());
} catch (ScopeNotActiveException e) {
System.out.println("ScopeNotActiveException caught: " + e.getMessage());
}
chunk.forEach(item -> System.out.println("Processing item: " + item + " in thread: " + Thread.currentThread().getName()));
};
}
@Bean
@StepScope
@Qualifier("MyDisposableBean")
public DisposableBean myDisposableBean() {
return new DisposableBean() {
@Override
public void destroy() throws Exception {
System.out.println("DisposeableBean is being destroyed");
}
@Override
public String toString() {
return "toString called";
}
};
}
}
Test class
package org.springframework.batch.samples.chunking.local;
import org.junit.jupiter.api.Test;
import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.parameters.JobParameters;
import org.springframework.batch.core.job.parameters.JobParametersBuilder;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class LocalChunkingStepScopeJobFunctionalTest {
@Test
public void testLaunchFixEnabled() throws Exception {
// given
ApplicationContext context = new AnnotationConfigApplicationContext(LocalChunkingStepScopeJobConfiguration.class);
JobOperator jobOperator = context.getBean(JobOperator.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParametersBuilder()
.addString("enableFix", "true")
.toJobParameters();
jobOperator.start(job, jobParameters);
}
@Test
public void testLaunchFixNotEnabled() throws Exception {
// given
ApplicationContext context = new AnnotationConfigApplicationContext(LocalChunkingStepScopeJobConfiguration.class);
JobOperator jobOperator = context.getBean(JobOperator.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParametersBuilder()
.addString("enableFix", "false")
.toJobParameters();
jobOperator.start(job, jobParameters);
}
}
Expected behavior
The logged output should look like this:
Call bean.toString() toString called
Call bean.toString() toString called
Processing item: 4 in thread: worker-thread-3
Processing item: 5 in thread: worker-thread-3
...
DisposeableBean is being destroyed
Currently it looks like this:
ScopeNotActiveException caught: Error creating bean with name 'scopedTarget.myDisposableBean': Scope 'step' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton
Processing item: 0 in thread: worker-thread-1
....
As you can see DisposeableBean is being destroyed is never printed.
Proposed change in ChunkTaskExecutorItemWriter
public ChunkTaskExecutorItemWriter(ChunkProcessor<T> chunkRequestProcessor, TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
this.chunkProcessorChunkHandler.setChunkProcessor(wrapInStepScope(chunkRequestProcessor));
}
private static <T> ChunkProcessor<T> wrapInStepScope(ChunkProcessor<T> chunkProcessor) {
return (chunk, contribution) -> {
StepSynchronizationManager.register(contribution.getStepExecution());
try {
chunkProcessor.process(chunk, contribution);
} finally {
StepSynchronizationManager.close();
}
};
}
I use wrapInStepScope as a workaround in my project to pass the wrapped chunkProcessor to ChunkTaskExecutorItemWriter, so this would also work if done directly inside ChunkTaskExecutorItemWriter.
Hello Spring Batch Team
I found an issue regarding the new Local Chunking introduced in version 6.0.
Bug description
When calling a method on a
@StepScopedBean inside aChunkProcessoraScopeNotActiveExceptionis thrown. There is also an issue for Beans that implementorg.springframework.beans.factory.DisposableBean. Spring will try to call thedestroymethod but will fail for the same reason. This happens silently and no stacktrace is printed.Environment
Spring Batch version: 6.0.3.
Steps to reproduce
I have created a test configuration and test case that can be copied into the samples. The test contains 2 tests:
The fix is similar to #5183
Test class
Expected behavior
The logged output should look like this:
Currently it looks like this:
As you can see
DisposeableBean is being destroyedis never printed.Proposed change in ChunkTaskExecutorItemWriter
I use
wrapInStepScopeas a workaround in my project to pass the wrappedchunkProcessortoChunkTaskExecutorItemWriter, so this would also work if done directly insideChunkTaskExecutorItemWriter.