Skip to content

ScopeNotActiveException with @StepScope and Local Chunking #5398

@spreiter301

Description

@spreiter301

Hello Spring Batch Team

I found an issue regarding the new Local Chunking introduced in version 6.0.

Bug description
When calling a method on a @StepScoped Bean inside a ChunkProcessor a ScopeNotActiveException is thrown. There is also an issue for Beans that implement org.springframework.beans.factory.DisposableBean. Spring will try to call the destroy method but will fail for the same reason. This happens silently and no stacktrace is printed.

Environment
Spring Batch version: 6.0.3.

Steps to reproduce
I have created a test configuration and test case that can be copied into the samples. The test contains 2 tests:

  • One with a fix
  • One that shows the bug

The fix is similar to #5183

package org.springframework.batch.samples.chunking.local;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.StepSynchronizationManager;
import org.springframework.batch.core.step.Step;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.item.ChunkProcessor;
import org.springframework.batch.infrastructure.item.ItemReader;
import org.springframework.batch.integration.chunk.ChunkTaskExecutorItemWriter;
import org.springframework.batch.samples.common.DataSourceConfiguration;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.support.ScopeNotActiveException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.jdbc.support.JdbcTransactionManager;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.Iterator;
import java.util.stream.IntStream;

@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class)
public class LocalChunkingStepScopeJobConfiguration {

    @Bean
    public Job job(JobRepository jobRepository, Step chunkingStep) {
        return new JobBuilder("job", jobRepository).start(chunkingStep).build();
    }

    @Bean
    public Step chunkingStep(JobRepository jobRepository,
                             JdbcTransactionManager transactionManager,
                             ItemReader<Integer> itemReader,
                             ChunkTaskExecutorItemWriter<Integer> itemWriter) {
        return new StepBuilder("chunkingStep", jobRepository).<Integer, Integer>chunk(2)
                .transactionManager(transactionManager)
                .reader(itemReader)
                .writer(itemWriter)
                .build();
    }

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ItemReader<>() {
            private final Iterator<Integer> data = IntStream.range(0, 10).boxed().toList().iterator();

            @Override
            public Integer read() {
                if (data.hasNext()) {
                    return data.next();
                } else {
                    return null;
                }
            }
        };
    }


    @Bean
    @StepScope
    public ChunkTaskExecutorItemWriter<Integer> itemWriter(ChunkProcessor<Integer> chunkProcessor,
                                                      @Value("#{jobParameters['enableFix']}") boolean enableFix) {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(4);
        taskExecutor.setThreadNamePrefix("worker-thread-");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.afterPropertiesSet();
        if (enableFix) {
            return new ChunkTaskExecutorItemWriter<>((chunk, c) -> {
                // This is the fix similar to https://github.com/spring-projects/spring-batch/issues/5183
                StepSynchronizationManager.register(c.getStepExecution());
                try {
                    chunkProcessor.process(chunk, c);
                } finally {
                    StepSynchronizationManager.close();
                }
            }, taskExecutor);
        } else {
            return new ChunkTaskExecutorItemWriter<>(((chunk, contribution) -> {
                chunkProcessor.process(chunk, contribution);
            }), taskExecutor);
        }
    }

    @Bean
    public ChunkProcessor<Integer> chunkProcessor(@Qualifier("MyDisposableBean") DisposableBean bean) {
        return (chunk, c) -> {
            try {
                System.out.println("Call bean.toString() " + bean.toString());
            } catch (ScopeNotActiveException e) {
                System.out.println("ScopeNotActiveException caught: " + e.getMessage());
            }
            chunk.forEach(item -> System.out.println("Processing item: " + item + " in thread: " + Thread.currentThread().getName()));
        };
    }

    @Bean
    @StepScope
    @Qualifier("MyDisposableBean")
    public DisposableBean myDisposableBean() {
        return new DisposableBean() {
            @Override
            public void destroy() throws Exception {
                System.out.println("DisposeableBean is being destroyed");
            }

            @Override
            public String toString() {
                return "toString called";
            }
        };
    }
}

Test class

package org.springframework.batch.samples.chunking.local;

import org.junit.jupiter.api.Test;
import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.parameters.JobParameters;
import org.springframework.batch.core.job.parameters.JobParametersBuilder;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class LocalChunkingStepScopeJobFunctionalTest {

    @Test
    public void testLaunchFixEnabled() throws Exception {
        // given
        ApplicationContext context = new AnnotationConfigApplicationContext(LocalChunkingStepScopeJobConfiguration.class);
        JobOperator jobOperator = context.getBean(JobOperator.class);
        Job job = context.getBean(Job.class);
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("enableFix", "true")
                .toJobParameters();

        jobOperator.start(job, jobParameters);
    }

    @Test
    public void testLaunchFixNotEnabled() throws Exception {
        // given
        ApplicationContext context = new AnnotationConfigApplicationContext(LocalChunkingStepScopeJobConfiguration.class);
        JobOperator jobOperator = context.getBean(JobOperator.class);
        Job job = context.getBean(Job.class);
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("enableFix", "false")
                .toJobParameters();

        jobOperator.start(job, jobParameters);
    }
}

Expected behavior
The logged output should look like this:

Call bean.toString() toString called
Call bean.toString() toString called
Processing item: 4 in thread: worker-thread-3
Processing item: 5 in thread: worker-thread-3
...
DisposeableBean is being destroyed

Currently it looks like this:

ScopeNotActiveException caught: Error creating bean with name 'scopedTarget.myDisposableBean': Scope 'step' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton
Processing item: 0 in thread: worker-thread-1
....

As you can see DisposeableBean is being destroyed is never printed.

Proposed change in ChunkTaskExecutorItemWriter

	public ChunkTaskExecutorItemWriter(ChunkProcessor<T> chunkRequestProcessor, TaskExecutor taskExecutor) {
		this.taskExecutor = taskExecutor;
		this.chunkProcessorChunkHandler.setChunkProcessor(wrapInStepScope(chunkRequestProcessor));
	}

    private static <T> ChunkProcessor<T> wrapInStepScope(ChunkProcessor<T> chunkProcessor) {
        return (chunk, contribution) -> {
            StepSynchronizationManager.register(contribution.getStepExecution());
            try {
                chunkProcessor.process(chunk, contribution);
            } finally {
                StepSynchronizationManager.close();
            }
        };
    }

I use wrapInStepScope as a workaround in my project to pass the wrapped chunkProcessor to ChunkTaskExecutorItemWriter, so this would also work if done directly inside ChunkTaskExecutorItemWriter.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions