eBook – Guide Spring Cloud – NPI EA (cat=Spring Cloud)
announcement - icon

Let's get started with a Microservice Architecture with Spring Cloud:

>> Join Pro and download the eBook

eBook – Mockito – NPI EA (tag = Mockito)
announcement - icon

Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.

Get started with mocking and improve your application tests using our Mockito guide:

Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Reactive – NPI EA (cat=Reactive)
announcement - icon

Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:

>> Join Pro and download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Jackson – NPI EA (cat=Jackson)
announcement - icon

Do JSON right with Jackson

Download the E-book

eBook – HTTP Client – NPI EA (cat=Http Client-Side)
announcement - icon

Get the most out of the Apache HTTP Client

Download the E-book

eBook – Maven – NPI EA (cat = Maven)
announcement - icon

Get Started with Apache Maven:

Download the E-book

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

eBook – RwS – NPI EA (cat=Spring MVC)
announcement - icon

Building a REST API with Spring?

Download the E-book

Course – LS – NPI EA (cat=Jackson)
announcement - icon

Get started with Spring and Spring Boot, through the Learn Spring course:

>> LEARN SPRING
Course – RWSB – NPI EA (cat=REST)
announcement - icon

Explore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:

>> The New “REST With Spring Boot”

Course – LSS – NPI EA (cat=Spring Security)
announcement - icon

Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.

I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.

You can explore the course here:

>> Learn Spring Security

Course – LSD – NPI EA (tag=Spring Data JPA)
announcement - icon

Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.

Get started with Spring Data JPA through the guided reference course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (cat=Spring Boot)
announcement - icon

Refactor Java code safely — and automatically — with OpenRewrite.

Refactoring big codebases by hand is slow, risky, and easy to put off. That’s where OpenRewrite comes in. The open-source framework for large-scale, automated code transformations helps teams modernize safely and consistently.

Each month, the creators and maintainers of OpenRewrite at Moderne run live, hands-on training sessions — one for newcomers and one for experienced users. You’ll see how recipes work, how to apply them across projects, and how to modernize code with confidence.

Join the next session, bring your questions, and learn how to automate the kind of work that usually eats your sprint time.

Course – LJB – NPI EA (cat = Core Java)
announcement - icon

Code your way through and build up a solid, practical foundation of Java:

>> Learn Java Basics

Partner – LambdaTest – NPI EA (cat= Testing)
announcement - icon

Distributed systems often come with complex challenges such as service-to-service communication, state management, asynchronous messaging, security, and more.

Dapr (Distributed Application Runtime) provides a set of APIs and building blocks to address these challenges, abstracting away infrastructure so we can focus on business logic.

In this tutorial, we'll focus on Dapr's pub/sub API for message brokering. Using its Spring Boot integration, we'll simplify the creation of a loosely coupled, portable, and easily testable pub/sub messaging system:

>> Flexible Pub/Sub Messaging With Spring Boot and Dapr

eBook – Java Streams – NPI (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

1. Overview

In this tutorial, we’ll explore how to accomplish batch processing of Stream data in Java. We’ll see examples using both native Java features and some third-party libraries.

2. What Does Batch Processing of Stream Data Mean?

Batch processing of Stream data in Java refers to the practice of dividing a big dataset into smaller, more manageable chunks and processing them in order. In this scenario, the data source for processing comes from a data stream.

This can be advantageous for a variety of reasons, including increasing data processing efficiency, processing very huge datasets that may not fit in memory all at once, and providing a mechanism to process data in parallel using several processors.

However, there are various issues that can occur when implementing batch processing:

  • Setting an acceptable batch size: The overhead of processing each batch may become significant if the batch size is too small. However, if the batch size is too large, processing each batch may take too long, which can cause delays in the stream-processing pipeline.
  • State management: In order to keep track of intermediate results or to guarantee that each batch is processed consistently with prior batches, it’s often required to preserve the state between batches when employing batch processing. The complexity of working with dispersed systems increases the difficulty of state management.
  • Fault tolerance: When processing large datasets in batches, it’s critical to ensure that processing can be continued if a failure occurs. This can be difficult since it may be necessary to store huge quantities of the intermediate state in order to resume processing.

In this article, for simplicity and clarity, we’ll only focus on the batch processing of Stream data in Java and not on how to address the above-mentioned issues.

3. Batch Processing With Java Streams API

First, we must note some crucial concepts we’ll work with. First, we have the Streams API, a major feature introduced in Java 8. From the Streams API, we’ll use the Stream class.

In this case, we need to consider that a declared data stream can be called only once. If we try to operate a second time over the same data stream, we get an IllegalStateException. A quick exapmle shows us this behavior:

Stream<String> coursesStream = Stream.of("Java", "Frontend", "Backend", "Fullstack");
Stream<Integer> coursesStreamLength = coursesStream.map(String::length);
// we get java.lang.IllegalStateException
Stream<String> emphasisCourses = coursesStream.map(course -> course + "!");

Second, we’ll work on most examples within the following sections using a functional style. Some of the examples have side effects, and we must try to avoid them in a functional programming style.

Before we build our code examples, let’s define our test data stream, batch size, and expected batch result.

Our data stream will be a Stream of Integer values:

Stream<Integer> data = IntStream.range(0, 34).boxed();

Then, our batch size is 10:

private final int BATCH_SIZE = 10;

And finally, let’s define our expected batches:

private final List<Integer> firstBatch = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
private final List<Integer> secondBatch = List.of(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);
private final List<Integer> thirdBatch = List.of(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
private final List<Integer> fourthBatch = List.of(30, 31, 32, 33);

Next, let’s look at some examples.

4. Using an Iterator

The first approach uses a custom implementation from the Iterator interface. We define a CustomBatchIterator class, and we can set the batch size when initializing a new instance of the Iterator.

Let’s jump into the code:

public class CustomBatchIterator<T> implements Iterator<List<T>> {
    private final int batchSize;
    private List<T> currentBatch;
    private final Iterator<T> iterator;
    private CustomBatchIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.iterator = sourceIterator;
    }
    @Override
    public List<T> next() {
        prepareNextBatch();
        return currentBatch;
    }
    @Override
    public boolean hasNext() {
        return iterator.hasNext();
    }
    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (iterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(iterator.next());
        }
    }
}

Here, we’ve overridden the hasNext() and next() methods of the Iterator interface in our CustomBatchIterator class. The hasNext() method from the Iterator instance iterator inside the class is used by the hasNext() method in CustomBatchIterator. We need to use the next() method, first to prepare the current batch and then to get the latest information.

The prepareNextBatch() method begins by populating the current batch with elements from the source iterator until the batch is complete or the source iterator runs out of elements, whichever happens first. currentBatch is initialized as an empty list with a size equal to batchSize.

Additionally, we declare the CustomBatchIterator constructor as private. This prevents CustomBatchIterator from being instantiated outside its class scope. We’ll add a static batchStreamOf() method to make CustomBatchIterator usable.

The next step is to add two static methods to our class:

public class CustomBatchIterator<T> implements Iterator<List<T>> {

    // other methods

    public static <T> Stream<List<T>> batchStreamOf(Stream<T> stream, int batchSize) {
        return stream(new CustomBatchIterator<>(stream.iterator(), batchSize));
    }
    private static <T> Stream<T> stream(Iterator<T> iterator) {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, ORDERED), false);
    }
}

Our batchStreamOf() method generates a stream of batches from a data stream. It accomplishes this by instantiating the CustomBatchIterator class and handing it to the stream() method, which produces a Stream from an Iterator.

Our stream() method creates a Spliterator (a special iterator that can be explored using streams) from the Iterator using the Spliterators.spliteratorUnknownSize() method and then gives the Spliterator to the StreamSupport.stream() method to build the stream.

Now, it’s time to test our implementation:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingSpliterator_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    CustomBatchIterator.batchStreamOf(data, BATCH_SIZE).forEach(result::add);
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

In the above test, we pass our data stream and batch size to the batchStreamOf() method. Then, we check that we got four batches after data processing.

5. Using Collection API

Our next example uses the Collection API and is relatively more straightforward than the first one.

Let’s see our test case:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCollectionAPI_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = data.collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
      .values();
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

We use Collectors.groupingBy() method from the Java Streams API in this code snippet to group the elements in the data stream by a key calculated using the it -> it / BATCH_SIZE lambda expression. The lambda expression divides each element by BATCH_SIZE, and the result is returned as the key.

Then, we invoke the map’s values method to retrieve a collection of the lists of elements, which we save in the result variable.

We can use the parallel() method from Stream for large datasets. However, we need to consider that the order of execution is out of our control. It may change every time we run the program.

Let’s check our test case using parallel():

@Test
public void givenAStreamOfData_whenIsProcessingInBatchParallelUsingCollectionAPI_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = data.parallel()
      .collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
      .values();
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

6. RxJava

RxJava is a Java version of ReactiveX, which is a library for composing asynchronous and event-based programs using observable sequences. We can use it in conjunction with the Streams API to do batch processing in Java.

First, let’s add its dependency in our pom.xml file:

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>3.1.5</version>
</dependency>

Our next step is to implement the test case:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingRxJavaV3_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    Observable.fromStream(data)
      .buffer(BATCH_SIZE)
      .subscribe(result::add);
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

To divide the data stream into manageable chunks, this code uses the buffer() operator from the RxJava library, with each chunk’s size determined by the variable BATCH_SIZE.

Additionally, we create an Observable from a data stream using the Observable.fromStream() method. We call the Observable‘s buffer() method with BATCH_SIZE as the input. The Observable items are sorted into lists of the size we choose, and each list is emitted as a new item in the stream.

The result is an Observable, and the subscribe() method is invoked on it with result::add as its argument. This creates a subscription to the Observable, and every time the Observable emits an item, the add method of the result list is invoked. In this scenario, the Observable‘s output consists of lists of elements aggregated into sets.

7. Vavr

Vavr is a functional programming library with immutable collections and other functional data structures.

In this case, we add its dependency in our pom.xml file:

<dependency>
    <groupId>io.vavr</groupId>
    <artifactId>vavr</artifactId>
    <version>1.0.0-alpha-4</version>
</dependency>

Now, let’s see the test case in action:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingVavr_thenFourBatchesAreObtained() {
    List<List<Integer>> result = Stream.ofAll(data)
      .toList()
      .grouped(BATCH_SIZE)
      .toList();
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

The Stream.ofAll() method transforms the data set into a stream using the Stream.ofAll() method. Finally, we use Stream‘s toList() method to turn it into a List. This final List is passed as an argument to the grouped() method along with the value BATCH_SIZE. This method returns an ordered List with BATCH_SIZE elements taken from the original list and replicated once within each inner list.

The List class is from the io.vavr.collection in the above test and not from java.util.List.

8. Reactor

The next option for batch processing is using the Reactor library. In addition to batch processing, Reactor, a Java library for reactive programming, offers several operators for working with streams. In this case, we’ll work with Flux to do the batch processing.

For this example, let’s add the dependency to our pom.xml file:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.5.1</version>
</dependency>

Let’s implement our test case:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingReactor_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    Flux.fromStream(data)
      .buffer(BATCH_SIZE)
      .subscribe(result::add);
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

To create a Flux from a java.util.stream.Stream object, we use the Flux.fromStream() method. This is handy when we want to process the elements of the stream using the reactive operators provided by the Flux class.

The buffer() operator is used to batch elements into fixed-size lists. Flux is added to the current list when it emits a new element. When the list reaches the appropriate size, Flux emits it, and a new list is formed. This can be useful for batch processing optimization, such as lowering the number of database queries or network requests.

Finally, the subscribe() method adds a Flux subscriber. The subscriber receives the items emitted by Flux. Next, it adds them to a result object. The subscribe() method produces a Subscription object, which may be used to regulate data flow and unsubscribe from the Flux when it’s no longer required.

9. Apache Commons

We can use a powerful library such as Apache Commons Collections to perform batch processing.

Let’s add its dependency in our pom.xml file:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-collections4</artifactId>
    <version>4.5.0-M2</version>
</dependency>

Our test implementation is straightforward:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingApacheCommon_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>(ListUtils
      .partition(data.collect(Collectors.toList()), BATCH_SIZE));
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

The partition() method is an Apache Commons ListUtils utility method that accepts a List and a size. It produces a List of Lists, with each inner List having a maximum size of the provided size. We can notice that the data stream is converted into a List before passing it to the partition() method.

10. Guava

Next, we have the Guava library. Guava offers a variety of utility methods for working with collections, including batch processing.

Let’s add the dependency in our pom.xml file:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>33.0.0-jre</version>
</dependency>

Now, let’s see our working example:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingGuava_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    Iterators.partition(data.iterator(), BATCH_SIZE).forEachRemaining(result::add);
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

The Iterators.partition() method can help break a large dataset into smaller chunks for processing, such as analyzing the data in parallel or loading it into a database in batches.

We use the Iterators.partition() method to split an Iterator of data into a series of smaller Iterators. The data passed to the Iterators.partition() method is the Iterator from our data stream. Additionally, we passed it the BATCH_SIZE.

11. Cyclops

Finally, we have the Cyclops library based on the jool library. Cyclops React is a library that includes several operators for interacting with streams, including some for batch processing.

Let’s add its dependency to our pom.xml:

<dependency>
    <groupId>com.oath.cyclops</groupId>
    <artifactId>cyclops</artifactId>
    <version>10.4.1</version>
</dependency>

And let’s look at the code for our last example:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclops_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    ReactiveSeq.fromStream(data)
      .grouped(BATCH_SIZE)
      .toList()
      .forEach(value -> result.add(value.collect(Collectors.toList())));
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

The ReactiveSeq class is a type of reactive sequence. Besides, the ReactiveSeq.fromStream() method turns the data stream into a reactive sequence. Then, the data is grouped into batches of BATCH_SIZE. The processed data is then collected into a collection of integer Lists.

However, we can get a lazy, functional style using LazySeq. In this case, we only need to replace ReactiveSeq with LazySeq:

@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclopsLazy_thenFourBatchesAreObtained() {
    Collection<List<Integer>> result = new ArrayList<>();
    LazySeq.fromStream(data)
      .grouped(BATCH_SIZE)
      .toList()
      .forEach(value -> result.add(value.collect(Collectors.toList())));
    assertTrue(result.contains(firstBatch));
    assertTrue(result.contains(secondBatch));
    assertTrue(result.contains(thirdBatch));
    assertTrue(result.contains(fourthBatch));
}

12. Conclusion

In this article, we learned several approaches to accomplish batch processing of Streams in Java. We explored several alternatives, from Java native APIs to some popular libraries like RxJava, Vavr, and Apache Commons.

The code backing this article is available on GitHub. Once you're logged in as a Baeldung Pro Member, start learning and coding on the project.
Baeldung Pro – NPI EA (cat = Baeldung)
announcement - icon

Baeldung Pro comes with both absolutely No-Ads as well as finally with Dark Mode, for a clean learning experience:

>> Explore a clean Baeldung

Once the early-adopter seats are all used, the price will go up and stay at $33/year.

eBook – HTTP Client – NPI EA (cat=HTTP Client-Side)
announcement - icon

The Apache HTTP Client is a very robust library, suitable for both simple and advanced use cases when testing HTTP endpoints. Check out our guide covering basic request and response handling, as well as security, cookies, timeouts, and more:

>> Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

Course – LS – NPI EA (cat=REST)

announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

Partner – Moderne – NPI EA (tag=Refactoring)
announcement - icon

Modern Java teams move fast — but codebases don’t always keep up. Frameworks change, dependencies drift, and tech debt builds until it starts to drag on delivery. OpenRewrite was built to fix that: an open-source refactoring engine that automates repetitive code changes while keeping developer intent intact.

The monthly training series, led by the creators and maintainers of OpenRewrite at Moderne, walks through real-world migrations and modernization patterns. Whether you’re new to recipes or ready to write your own, you’ll learn practical ways to refactor safely and at scale.

If you’ve ever wished refactoring felt as natural — and as fast — as writing code, this is a good place to start.

eBook – Java Streams – NPI (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook Jackson – NPI EA – 3 (cat = Jackson)
2 Comments
Oldest
Newest
Inline Feedbacks
View all comments