Skip to content

Spring Boot and Kafka – Practical Examples

A practical example project using Spring Boot and Kafka with multiple consumers and different serialization methods

This blog post shows how to configure Spring Kafka and Spring Boot to send messages using JSON and receive them in multiple formats: JSON, plain Strings, or byte arrays.

This sample application demonstrates how to use multiple Kafka consumers within the same consumer group with the @KafkaListener annotation so the messages are load-balanced. Each consumer implements a different deserialization approach.

Moreover, this post includes practical exercises adjusting Consumer Group and Topic partitions to help you grasp Kafka concepts.

Updated (May 8th, 2025): This post uses Spring Boot 3.4 and Java 24. If you’re curious about the changes, check the Pull Request.

Multiple consumers in a consumer group

Logical View

To better understand the configuration we’ll use in the examples, have a look at the diagram below. As you can see, we create a Kafka topic with three partitions. There is only one application on the consumer side, but it implements three Kafka consumers with the same group.id property. This configuration is needed to have them in the same Kafka Consumer Group.

Kafka deserialization examples
Kafka deserialization examples - Three consumers in the same consumer group

When we start the application, Kafka assigns each consumer a different partition. This consumer group will receive the messages in a load-balanced manner, effectively acting as a single conceptual consumer with three parallel workers. Later in this post, you’ll see the difference if we make them have different group identifiers (you probably know the result if you are familiar with Kafka).

The Example Use Case

The logic we are going to build is simple. Each time we call a given REST endpoint, /hello, the app will produce a configurable number of messages and send them to the same topic, using a sequence number as the Kafka key. The application will wait (using a CountDownLatch) for all messages to be consumed before returning a message, Hello Kafka!. There will be three consumers, each using a different deserialization mechanism, that will decrease the latch count when they receive a new message.

Easy, right? Let’s see how to build it.

Setting up Spring Boot and Kafka

All the code in this post is available on GitHub: Kafka and Spring Boot Example. If you find it useful, please give it a star!

Starting up Kafka

First, you need to have a running Kafka cluster to connect to. I will use Docker Compose and Kafka running on a single container node for this application. This is far from being a production configuration, but it is good enough for the goal of this post.

docker-compose.yml

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.9.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    healthcheck:
      test: ["CMD", "nc", "-z", "localhost", "2181"]
      interval: 10s
      timeout: 5s
      retries: 5
  kafka:
    image: confluentinc/cp-kafka:7.9.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: LISTENER_DOCKER://0.0.0.0:29092,LISTENER_HOST://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER://kafka:29092,LISTENER_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER:PLAINTEXT,LISTENER_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
    healthcheck:
      test: ["CMD", "nc", "-z", "localhost", "9092"]
      interval: 10s
      timeout: 5s
      retries: 5

  kafka-ui:
    image: provectuslabs/kafka-ui:v0.7.2
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "9080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: docker_local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

This configuration instructs Docker Compose to start three containers: the Kafka broker node, Zookeeper (required for Kafka), and an optional one exposing a user interface to monitor and manage Kafka (kafka-ui).

The Kafka node connects to Zookeeper within the Docker network and exposes a listener to the external network (the host computer). If you want to learn more about this basic configuration of Kafka listeners, check this Confluent’s article.

The Kafka UI container is available on localhost:9080 and it gives you a nice overview of our setup, topics, messages, etc. It even has some functionality to reset the offsets, send messages, etc. I recommend you to have a look at this tool and navigate through the configuration we’ll create.

Let’s not dive into any more details. For now, just note that I configured Kafka not to create topics automatically (KAFKA_AUTO_CREATE_TOPICS_ENABLE: ‘false’). We will create our topic from the Spring Boot application since we want to specify some custom configuration.

To start up our containers, just run docker-compose up from the folder where this file lives.

Basic Spring Boot and Kafka application

Using Spring Initializer to create the Kafka example project
Using Spring Initializer to create the Kafka example project

The easiest way to get a skeleton for our app is to navigate to start.spring.io, fill in the basic details for our project, and select Kafka as a dependency. Then, download the zip file and use your favorite IDE to load the sources.

Let’s use YAML for our configuration. You may need to rename the application.properties file inside src/main/java/resources to application.yml. These are the configuration values we are going to use for this sample application:

application.yml

spring:
  kafka:
    consumer:
      group-id: tpd-loggers
      auto-offset-reset: earliest
    # change this property if you are using your own
    # Kafka cluster or your Docker IP is different
    bootstrap-servers: localhost:9092

tpd:
  topic-name: advice-topic
  messages-per-request: 10

The first block of properties is the Spring Kafka configuration:

  • The group-id our consumers use by default.
  • The auto-offset-reset property is set to earliest. This means that consumers will start reading messages from the earliest one available when no existing offset exists for that consumer.
  • In this case, the Kafka server is the only one available since we use the single-node configuration. Note that this property is redundant if you use the default value, localhost:9092. I explicitly added it there to clarify it and let you know you must change it if your Docker is mapped to a different IP.

The second block is application-specific. We define the Kafka topic name and the number of messages to send when making an HTTP REST API call.

The Message class

This is the Java record that we will use as a Kafka message. We need to use the @JsonProperty annotations for the record fields so Jackson can deserialize the message correctly. If you’re using a version of Java that doesn’t support records, you can use the old version of this file available on GitHub.

PracticalAdvice.java

package io.tpd.kafkaexample;

import com.fasterxml.jackson.annotation.JsonProperty;

record PracticalAdvice(@JsonProperty("message") String message,
                       @JsonProperty("identifier") int identifier) {
}

Kafka Producer configuration in Spring Boot

We will add the configuration in the main Spring Boot class to keep the application simple. Eventually, we want to include both producer and consumer configurations here and use three different variations for deserialization. Remember that you can find the complete source code in the GitHub repository.

First, let’s focus on the Producer configuration.

KafkaExampleApplication.java

@SpringBootApplication
public class KafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaExampleApplication.class, args);
    }

    @Autowired
    private KafkaProperties kafkaProperties;

    @Value("${tpd.topic-name}")
    private String topicName;

    // Producer configuration

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props =
                new HashMap<>(kafkaProperties.buildProducerProperties());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public NewTopic adviceTopic() {
        return new NewTopic(topicName, 3, (short) 1);
    }
}

In this configuration, we are setting up two parts of the application:

  • The KafkaTemplate instance. This is the object we employ to send messages to Kafka. We don’t want to use the default one because we want to play with different settings, so we need to inject our custom version into Spring’s application context.

    • We type (with generics) the KafkaTemplate to have a plain String key and an Object as value. The reason for having Object as a value is that we want to send multiple object types with the same template.
    • The KafkaTemplate accepts a ProducerFactory as a parameter. We create a default factory with our custom producer configuration values.
    • The Producer Configuration is a simple key-value map. We inject the default properties using @Autowired to obtain the KafkaProperties bean. Then, we build our map, passing the default values for the producer and overriding the default Kafka key and value serializers. The producer will serialize keys as Strings using the Kafka library’s StringSerializer and will do the same for values, but this time using JSON, with a JsonSerializer (provided by Spring Kafka.
  • The Kafka topic. When we inject a NewTopic bean, we’re instructing the Kafka’s AdminClient bean (already in the context) to create a topic with the given configuration. The first parameter is the name (advice-topic), which we used in the app configuration. The second parameter is the number of partitions (3) and the third one is the replication factor (1, since we’re using a single node anyway).

About Kafka Serializers and Deserializers for Java

The core Kafka library (Javadoc) provides a few basic serializers for Strings, all number classes, and byte arrays, as well as the JSON ones provided by Spring Kafka (Javadoc).

On top of that, you can create your Serializers and Deserializers just by implementing Serializer or ExtendedSerializer, or their corresponding versions for deserialization. That gives you much flexibility to optimize the amount of data traveling through Kafka if you need to do so. As you can see in those interfaces, Kafka works with plain byte arrays, so, eventually, no matter what complex type you’re working with, it needs to be transformed into a byte[].

With that understanding, you may wonder why someone would want to use JSON with Kafka. It’s inefficient since you’re transforming your objects to JSON and then to a byte array. But you have to consider two main advantages of doing this:

  • A human can more easily read JSON than an array of bytes. If you want to debug or analyze the contents of your Kafka topics, it will be way more straightforward than looking at bare bytes.
  • JSON is a standard, whereas default byte array serializers depend on the programming language implementation. Thus, to consume messages from multiple programming languages, you must replicate the (de)serializer logic in all those languages.

On the other hand, if you are concerned about the traffic load in Kafka, storage, or speed in (de)serialization, you may want to choose byte arrays or even implement your serializer/deserializer.

Sending messages with Spring Boot and Kafka

Following the plan, we create a Rest Controller and use the injected KafkaTemplate to produce some JSON messages when a client calls the endpoint.

This is the first implementation of the controller, containing only the logic that produces the messages.

HelloKafkaController.java

@RestController
public class HelloKafkaController {

    private static final Logger logger =
            LoggerFactory.getLogger(HelloKafkaController.class);

    private final KafkaTemplate<String, Object> template;
    private final String topicName;
    private final int messagesPerRequest;
    private CountDownLatch latch;

    public HelloKafkaController(
            final KafkaTemplate<String, Object> template,
            @Value("${tpd.topic-name}") final String topicName,
            @Value("${tpd.messages-per-request}") final int messagesPerRequest) {
        this.template = template;
        this.topicName = topicName;
        this.messagesPerRequest = messagesPerRequest;
    }

    @GetMapping("/hello")
    public String hello() throws Exception {
        latch = new CountDownLatch(messagesPerRequest);
        IntStream.range(0, messagesPerRequest)
                .forEach(i -> this.template.send(topicName, String.valueOf(i),
                        new PracticalAdvice("A Practical Advice", i))
                );
        latch.await(60, TimeUnit.SECONDS);
        logger.info("All messages received");
        return "Hello Kafka!";
    }
}

In the constructor, we pass some configuration parameters and the KafkaTemplate that we customized to send String keys and JSON values. Then, when the API client requests the /hello endpoint, we send 10 messages (that’s the configuration value of messagesPerRequest), and then block the thread for a maximum of 60 seconds. As you can see, there is no implementation yet for the Kafka consumers to decrease the latch count. Once the latch gets unlocked, we return the message Hello Kafka! to our client.

This idea of using a latch isn’t a real-life pattern, but it’s suitable for us to learn how producers and consumers work. That way, you can check the number of messages received. If you prefer, you can remove the latch and return the “Hello Kafka!” message before receiving the messages. That’d be a more realistic case where the flow happens asynchronously.

Kafka Consumer configuration

As mentioned in this post, we want to demonstrate different ways of deserialization with Spring Boot and Spring Kafka and simultaneously see how multiple consumers can be load-balanced when they are part of the same consumer group.

KafkaExampleApplication.java

@SpringBootApplication
public class KafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaExampleApplication.class, args);
    }

    @Autowired
    private KafkaProperties kafkaProperties;

    @Value("${tpd.topic-name}")
    private String topicName;

    // Producer configuration
    // omitted...

    // Consumer configuration

    // If you only need one deserialization type, you only need to set the
    // Consumer configuration properties. Uncomment this and remove all others below.
//    @Bean
//    public Map<String, Object> consumerConfigs() {
//        Map<String, Object> props = new HashMap<>(
//                kafkaProperties.buildConsumerProperties()
//        );
//        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
//                StringDeserializer.class);
//        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
//                JsonDeserializer.class);
//        props.put(ConsumerConfig.GROUP_ID_CONFIG,
//                "tpd-loggers");
//
//        return props;
//    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        final JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
        jsonDeserializer.addTrustedPackages("*");
        return new DefaultKafkaConsumerFactory<>(
                kafkaProperties.buildConsumerProperties(), new StringDeserializer(), jsonDeserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    // String Consumer Configuration

    @Bean
    public ConsumerFactory<String, String> stringConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new StringDeserializer()
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerStringContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(stringConsumerFactory());

        return factory;
    }

    // Byte Array Consumer Configuration

    @Bean
    public ConsumerFactory<String, byte[]> byteArrayConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new ByteArrayDeserializer()
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerByteArrayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(byteArrayConsumerFactory());
        return factory;
    }
}

This configuration may look extensive. The reason is that, for educational purposes, we have repeated three times the creation of the ConsumerFactory and the KafkaListenerContainerFactory instances. This way, we can demonstrate these three types of deserialization by assigning a different one to each consumer.

The basic steps to configure a consumer are:

  1. [Omitted] We set up the consumer properties similarly to the ones we did for the producer. We can skip this step since the only configuration we need is the Group ID, specified in the Spring Boot properties file, and the key and value deserializers, which we will override while creating the customized consumer and KafkaListener factories. If you only need one configuration because you only need the same type of Key and Value deserializer (as it typically happens in many real applications), this commented code block is the only thing you need. Adjust the deserializer types to the ones you want to use.

  2. Create the ConsumerFactory to be used by the KafkaListenerContainerFactory. We create three, switching the value deserializer in each case to 1) a JSON deserializer, 2) a String deserializer, and 3) a Byte Array deserializer.

    1. Note that after creating the JSON Deserializer, we include an extra step to specify that we trust all packages. You can fine-tune this in your application if you want. If we don’t do this, we will get an error message: java.lang.IllegalArgumentException: The class [] is not in the trusted packages.
  3. Construct the Kafka Listener container factory (a concurrent one) using the previously configured Consumer Factory. Again, we do this three times to use a different one per instance.

Receiving messages with Spring Boot and Kafka in JSON, String, and byte[] formats

It’s time to show what Kafka consumers look like. We will use the @KafkaListener annotation since it simplifies the process and takes care of the deserialization of the passed Java type.

HelloKafkaController.java

@RestController
public class HelloKafkaController {

    private static final Logger logger =
            LoggerFactory.getLogger(HelloKafkaController.class);

    private final KafkaTemplate<String, Object> template;
    private final String topicName;
    private final int messagesPerRequest;
    private CountDownLatch latch;

    public HelloKafkaController(
            final KafkaTemplate<String, Object> template,
            @Value("${tpd.topic-name}") final String topicName,
            @Value("${tpd.messages-per-request}") final int messagesPerRequest) {
        this.template = template;
        this.topicName = topicName;
        this.messagesPerRequest = messagesPerRequest;
    }

    @GetMapping("/hello")
    public String hello() throws Exception {
        latch = new CountDownLatch(messagesPerRequest);
        IntStream.range(0, messagesPerRequest)
                .forEach(i -> this.template.send(topicName, String.valueOf(i),
                        new PracticalAdvice("A Practical Advice", i))
                );
        latch.await(60, TimeUnit.SECONDS);
        logger.info("All messages received");
        return "Hello Kafka!";
    }

    @KafkaListener(topics = "advice-topic", clientIdPrefix = "json",
            containerFactory = "kafkaListenerContainerFactory")
    public void listenAsObject(ConsumerRecord<String, PracticalAdvice> cr,
                              @Payload PracticalAdvice payload) {
        logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
        latch.countDown();
    }

    @KafkaListener(topics = "advice-topic", clientIdPrefix = "string",
            containerFactory = "kafkaListenerStringContainerFactory")
    public void listenasString(ConsumerRecord<String, String> cr,
                              @Payload String payload) {
        logger.info("Logger 2 [String] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
        latch.countDown();
    }

    @KafkaListener(topics = "advice-topic", clientIdPrefix = "bytearray",
            containerFactory = "kafkaListenerByteArrayContainerFactory")
    public void listenAsByteArray(ConsumerRecord<String, byte[]> cr,
                                  @Payload byte[] payload) {
        logger.info("Logger 3 [ByteArray] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
        latch.countDown();
    }

    private static String typeIdHeader(Headers headers) {
        return StreamSupport.stream(headers.spliterator(), false)
                .filter(header -> header.key().equals("__TypeId__"))
                .findFirst().map(header -> new String(header.value())).orElse("N/A");
    }
}

There are three listeners in this class. First, let’s describe the @KafkaListener annotation’s parameters so we can understand the differences:

  • All listeners are consuming from the same topic, advice-topic. This parameter is mandatory.
  • The parameter clientIdPrefix is optional. I’m using it here for the logs to be more human-friendly. You will know which consumer does what by its name prefix. Kafka will append a number to this prefix.
  • The containerFactory parameter is optional; you can also rely on naming convention. If you don’t specify it, it will look for a bean with the name kafkaListenerContainerFactory, which is also the default name used by Spring Boot when autoconfiguring Kafka. You can also override it by using the same name (although it looks like magic for someone who doesn’t know about the convention). We need to set it explicitly because we want to use a different one for each listener, to be able to use different deserializers.

Note that the first argument passed to all listeners is the same: a ConsumerRecord. The second one, annotated with @Payload is redundant if we use the first. We can access the payload using the method value() in ConsumerRecord, but I included it so you can see how simple it is to get the message payload directly by inferred deserialization.

The TypeId Header in Kafka

The __TypeId__ header is automatically set by the Kafka library by default. The utility method typeIdHeader that I use here is for educational purposes. It gets the string representation since you will only see a byte array in the output of ConsumerRecord’s toString() method. This TypeId header can be helpful for deserialization, so you can find the type to map the data to. It’s not needed for JSON deserialization because the Spring team provides a built-in deserializer that reads that header and infers the type from the method’s argument.

Running the application

All the code in this post is available on GitHub: Kafka and Spring Boot Example. If you find it useful, please give it a star!

Now that we have finished the Kafka producer and consumers, we can run Kafka and the Spring Boot app:

$ docker-compose up -d
Starting kafka-example_zookeeper_1 ... done
Starting kafka-example_kafka_1     ... done
$ mvn spring-boot:run
...

The Spring Boot app starts, and the consumers are registered in Kafka, which assigns them a partition. We configured the topic with three partitions, so each consumer gets one of them assigned.

Terminal - Spring Boot App - Kafka Topic Partitions

[Consumer clientId=bytearray-0, groupId=tpd-loggers] Subscribed to topic(s): advice-topic
[Consumer clientId=json-0, groupId=tpd-loggers] Subscribed to topic(s): advice-topic
[Consumer clientId=string-0, groupId=tpd-loggers] Subscribed to topic(s): advice-topic
[...]
[Consumer clientId=bytearray-0, groupId=tpd-loggers] Notifying assignor about the new Assignment(partitions=[advice-topic-0])
[Consumer clientId=string-0, groupId=tpd-loggers] Notifying assignor about the new Assignment(partitions=[advice-topic-2])
[Consumer clientId=json-0, groupId=tpd-loggers] Notifying assignor about the new Assignment(partitions=[advice-topic-1])

We can now try an HTTP call to the service. You can use your browser or a command-line tool like curl, for example:

Terminal - Make an HTTP call to /hello

$ curl localhost:8080/hello

The output in the logs should look like this:

Terminal - Spring Boot App - Consumer logs for each deserializer type

[ntainer#1-0-C-1] [...]: Logger 2 [String] received key 0: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":0} | Record: ConsumerRecord(topic = advice-topic, partition = 2, leaderEpoch = 0, offset = 16, CreateTime = 1633682802702, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 0, value = {"message":"A Practical Advice","identifier":0})
[ntainer#2-0-C-1] [...]: Logger 3 [ByteArray] received key 1: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 49, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, leaderEpoch = 0, offset = 16, CreateTime = 1633682802753, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 1, value = [B@7a82cd87)
[ntainer#2-0-C-1] [...]: Logger 3 [ByteArray] received key 5: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 53, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, leaderEpoch = 0, offset = 17, CreateTime = 1633682802754, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 5, value = [B@489fdc2)
[ntainer#1-0-C-1] [...]: Logger 2 [String] received key 2: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":2} | Record: ConsumerRecord(topic = advice-topic, partition = 2, leaderEpoch = 0, offset = 17, CreateTime = 1633682802753, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 2, value = {"message":"A Practical Advice","identifier":2})
[ntainer#2-0-C-1] [...]: Logger 3 [ByteArray] received key 7: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 55, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, leaderEpoch = 0, offset = 18, CreateTime = 1633682802754, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 7, value = [B@4750641d)
[ntainer#1-0-C-1] [...]: Logger 2 [String] received key 3: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":3} | Record: ConsumerRecord(topic = advice-topic, partition = 2, leaderEpoch = 0, offset = 18, CreateTime = 1633682802753, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 3, value = {"message":"A Practical Advice","identifier":3})
[ntainer#2-0-C-1] [...]: Logger 3 [ByteArray] received key 8: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 56, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, leaderEpoch = 0, offset = 19, CreateTime = 1633682802754, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 8, value = [B@36e0d7e5)
[ntainer#1-0-C-1] [...]: Logger 2 [String] received key 9: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":9} | Record: ConsumerRecord(topic = advice-topic, partition = 2, leaderEpoch = 0, offset = 19, CreateTime = 1633682802754, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 9, value = {"message":"A Practical Advice","identifier":9})
[ntainer#0-0-C-1] [...]: Logger 1 [JSON] received key 4: Type [N/A] | Payload: PracticalAdvice[message=A Practical Advice, identifier=4] | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 8, CreateTime = 1633682802754, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = PracticalAdvice[message=A Practical Advice, identifier=4])
[ntainer#0-0-C-1] [...]: Logger 1 [JSON] received key 6: Type [N/A] | Payload: PracticalAdvice[message=A Practical Advice, identifier=6] | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 9, CreateTime = 1633682802754, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 6, value = PracticalAdvice[message=A Practical Advice, identifier=6])
[nio-8080-exec-1] [...]: All messages received

Explanation

Kafka hashes the message key (a simple string identifier) and, based on that, divides messages into different partitions. Each consumer receives the messages in its assigned partition and converts them to a Java object using its deserializer. Remember, our producer always sends JSON values.

As you can see in the logs, each deserializer manages to do its task. The String consumer prints the raw JSON message, the Byte Array shows the byte representation of that JSON String, and the JSON deserializer uses the Java Type Mapper to convert it to the original class, PracticalAdvice. You can look at the logged ConsumerRecord, and you’ll see the headers, the assigned partition, the offset, etc.

That’s how you can send and receive JSON messages with Spring Boot and Kafka. And now, you can use the same code example to learn more about how Kafka works. Keep reading below.

Try some Kafka Exercises

If you are new to Kafka, you may want to try some code changes to better understand how it works.

Request /hello multiple times

Make a few requests, then look at how the messages are distributed across partitions. Kafka messages with the same key are always placed in the same partitions. This feature is handy when you want to ensure that all messages for a given user, process, or whatever logic you’re working on are received by the same consumer in the same order as they were produced, no matter how much load balancing you’re doing. This is often referred to as stickiness, and you can define which property to use to stick to the same processor.

As a practical example, say that the consumers are receiving UserUpdatedEvent objects, and making some decisions based on that. For example, there could be three parallel workers that are responsible for user’s privacy terms logic:

  1. Store the version of the accepted terms for each user in a local database, shared by all the consumer instances.
  2. Delete the user data when the event’s data indicates a user’s deletion.

If you process the same user’s event in two parallel workers, you take the risk of having race conditions where a slow worker processing the event 1 tries to update a user that no longer exists, because another worker processed faster an event of type 2. That’s why partition key modelling is essential in applications using Kafka. Ideally, all the events for the same user in this application are processed by the same worker.

Reduce the number of partitions

Kafka – more consumers in a group than partitions
Kafka – more consumers in a group than partitions

First, restart Kafka, so you discard the previous configuration.

Then, redefine the topic in the application to have only two partitions:

KafkaExampleApplication.java - Reconfigure the topic

@Bean
public NewTopic adviceTopic() {
    return new NewTopic(topicName, 2, (short) 1);
}

Now, rerun the app and make an HTTP request to the /hello endpoint.

Terminal - Spring Boot App - Consumer with no assigned partition

Logger 3 [ByteArray] received key 0: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 48, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1633685589028, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 0, value = [B@344c2b5)
        Logger 3 [ByteArray] received key 2: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 50, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1633685589069, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 2, value = [B@57d87942)
        Logger 3 [ByteArray] received key 5: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 53, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1633685589070, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 5, value = [B@76636ec3)
        Logger 3 [ByteArray] received key 6: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 54, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1633685589070, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 6, value = [B@2408845d)
        Logger 1 [JSON] received key 1: Type [N/A] | Payload: PracticalAdvice[message=A Practical Advice, identifier=1] | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1633685589069, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = PracticalAdvice[message=A Practical Advice, identifier=1])
        Logger 1 [JSON] received key 3: Type [N/A] | Payload: PracticalAdvice[message=A Practical Advice, identifier=3] | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 1, CreateTime = 1633685589070, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 3, value = PracticalAdvice[message=A Practical Advice, identifier=3])
        Logger 1 [JSON] received key 4: Type [N/A] | Payload: PracticalAdvice[message=A Practical Advice, identifier=4] | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 2, CreateTime = 1633685589070, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = PracticalAdvice[message=A Practical Advice, identifier=4])
        Logger 1 [JSON] received key 7: Type [N/A] | Payload: PracticalAdvice[message=A Practical Advice, identifier=7] | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 3, CreateTime = 1633685589070, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 7, value = PracticalAdvice[message=A Practical Advice, identifier=7])
        Logger 1 [JSON] received key 8: Type [N/A] | Payload: PracticalAdvice[message=A Practical Advice, identifier=8] | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 4, CreateTime = 1633685589070, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 8, value = PracticalAdvice[message=A Practical Advice, identifier=8])
        Logger 1 [JSON] received key 9: Type [N/A] | Payload: PracticalAdvice[message=A Practical Advice, identifier=9] | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 5, CreateTime = 1633685589070, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 9, value = PracticalAdvice[message=A Practical Advice, identifier=9])
        All messages received

One of the consumers (in this case, the String version) doesn’t receive any messages. In this example case, only the JSON and ByteArray consumers are active. This behavior is expected since no more partitions are available within the same consumer group to get assigned to the additional consumer. To reproduce this behavior, you may need to destroy the Kafka cluster in Docker with docker-compose down -v and start it again, so the topic can be recreated from Java.

The important takeaway from this practical example is that the number of partitions in your configuration should be equal or higher to the number of parallel workers you’re planning to have in your application.

Change one Consumer’s Group Identifier

Kafka – Two consumer groups
Kafka – Two consumer groups

Let’s keep the changes from the previous case, so the topic now has only two partitions. Then,change the group ID of one of our consumers. This will conceptually make it an independent consumer, so it will no longer share the load with the other group.

HelloKafkaController - Changing Kafka Group Identifier to tpd-loggers-2

@KafkaListener(topics = "advice-topic", clientIdPrefix = "bytearray",
        containerFactory = "kafkaListenerByteArrayContainerFactory",
        groupId = "tpd-loggers-2")
public void listenAsByteArray(ConsumerRecord<String, byte[]> cr,
                              @Payload byte[] payload) {
        logger.info("Logger 3 [ByteArray] received key {}", cr.key());
    latch.countDown();
}

We also changed the logged message to easily differentiate it from the rest. Now, it only prints the received key. We also need to change the CountDownLatch so that it expects twice the number of messages since there are two different consumer groups. Keep reading for a detailed explanation.

HelloKafkaController - Adjusting latch to expect twice the number of messages

latch = new CountDownLatch(messagesPerRequest * 2);

Why do we need this? As I described at the beginning of this post, when consumers belong to the same Consumer Group, they’re (conceptually) working on the same task. We’re implementing a load-balanced mechanism in which concurrent workers get messages from different partitions without needing to process each other’s messages. They’re splitting the job across workers.

In this example, I also changed the “conceptual task” of the last consumer to understand this better: it’s doing something different, printing a different message in this case. Since we changed the group ID, this consumer works separately from the others. Kafka will assign both partitions to it since it’s the only one in its group. The Byte Array consumer receives all messages now. See the logs below.

Terminal - Spring Boot App - Two different consumer groups

Logger 3 [ByteArray] received key 0
Logger 3 [ByteArray] received key 2
Logger 3 [ByteArray] received key 5
Logger 3 [ByteArray] received key 6
Logger 2 [String] received key 1: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":1} | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1633691293887, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 1, value = {"message":"A Practical Advice","identifier":1})
Logger 2 [String] received key 3: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":3} | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 1, CreateTime = 1633691293887, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 3, value = {"message":"A Practical Advice","identifier":3})
Logger 2 [String] received key 4: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":4} | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 2, CreateTime = 1633691293887, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 4, value = {"message":"A Practical Advice","identifier":4})
Logger 2 [String] received key 7: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":7} | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 3, CreateTime = 1633691293888, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 7, value = {"message":"A Practical Advice","identifier":7})
Logger 2 [String] received key 8: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":8} | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 4, CreateTime = 1633691293888, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 8, value = {"message":"A Practical Advice","identifier":8})
Logger 2 [String] received key 9: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":9} | Record: ConsumerRecord(topic = advice-topic, partition = 1, leaderEpoch = 0, offset = 5, CreateTime = 1633691293888, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 9, value = {"message":"A Practical Advice","identifier":9})
Logger 1 [JSON] received key 0: Type [N/A] | Payload: PracticalAdvice[message=A Practical Advice, identifier=0] | Record: ConsumerRecord(topic = advice-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1633691293853, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = PracticalAdvice[message=A Practical Advice, identifier=0])
Logger 1 [JSON] received key 2: Type [N/A] | Payload: PracticalAdvice[message=A Practical Advice, identifier=2] | Record: ConsumerRecord(topic = advice-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1633691293887, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = PracticalAdvice[message=A Practical Advice, identifier=2])
Logger 1 [JSON] received key 5: Type [N/A] | Payload: PracticalAdvice[message=A Practical Advice, identifier=5] | Record: ConsumerRecord(topic = advice-topic, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1633691293888, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 5, value = PracticalAdvice[message=A Practical Advice, identifier=5])
Logger 1 [JSON] received key 6: Type [N/A] | Payload: PracticalAdvice[message=A Practical Advice, identifier=6] | Record: ConsumerRecord(topic = advice-topic, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1633691293888, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 6, value = PracticalAdvice[message=A Practical Advice, identifier=6])
Logger 3 [ByteArray] received key 1
Logger 3 [ByteArray] received key 3
Logger 3 [ByteArray] received key 4
Logger 3 [ByteArray] received key 7
Logger 3 [ByteArray] received key 8
Logger 3 [ByteArray] received key 9
All messages received

These simple exercises should have enabled you to quickly learn these essential Kafka concepts in practice. Remember: if you liked this post, please share it, give a star to the repository, or let me know your feedback through the comments.

Master Microservices with Spring Boot

A hands-on guide to building production-ready microservices

Get the Book

ENJOYED THIS ARTICLE?

Get More Architecture Insights

Join hundreds of software architects and engineers getting practical insights delivered to their inbox.

Back to Blog
COMING SOON

Enjoying this Kafka guide?

Join the waitlist and get 30% off our ebook and online training