Apache Kafka with Java getting started tutorial demonstrates how quickly you can get started with Kafka using Docker. This extends Apache Kafka example with Java – getting started tutorial – Part 1.
Step 1: As discussed in part-1, stand-up the Kafka, Zookeeper & Apache Kafka Cluster Visualization (AKHQ) on a Docker container.
|
1 2 3 |
~/projects/simple-kafka]$ docker-compose up |
Java Producer class ProducerApp.java
Step 2: Create the Java class to produce & send a message to the Kafka broker topic “my-first-topic“.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
package com.myapp; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; public class ProducerApp { private static final Logger LOG = LoggerFactory.getLogger(ProducerApp.class); public static void main(String[] args) { String topic = "my-first-topic"; Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // create the producer KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // create a producer record ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "Some text to publish"); producer.send(producerRecord); // flush data - synchronous producer.flush(); // flush and close producer producer.close(); } } |
Run the Consumer class ConsumerApp.java
Step 3: Start the ConsumerApp.java class that was created in part-1.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
package com.myapp; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class ConsumerApp { private static final Logger LOG = LoggerFactory.getLogger(ConsumerApp.class); public static void main(String[] args) { String topic = "my-first-topic"; String group = "my-first-application"; Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093"); props.put(ConsumerConfig.GROUP_ID_CONFIG, group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); LOG.info("Subscribed to Kafka Topic " + topic); //Keeps running & checks for messages every 5 seconds while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord<String, String> record : records) LOG.info(String.format("************ offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value())); } } } |
Run the Producer class ProducerApp.java
Step 4: Run the ProducerApp.java class to send a test message.
Step 5: In the ConsumerApp.java console you see the message printed.
|
1 2 3 4 |
21:25:23.580 [main] INFO com.myapp.ConsumerApp - Subscribed to Kafka Topic my-first-topic 21:25:34.319 [main] INFO com.myapp.ConsumerApp - ************ offset = 0, key = null, value = Some text to publish |
If you double-click on the “my-first-topic” in localhost:9099 you can see the messages published with the offsets.
View the message on akHQ.io UI
Step 6: If you run the producer “ProducerApp.java” two more times, you can see the offset count increase in the “ConsumerApp.java” console & in the akhq.io UI.
|
1 2 3 4 5 6 7 8 |
21:25:23.580 [main] INFO com.myapp.ConsumerApp - Subscribed to Kafka Topic my-first-topic 21:25:34.319 [main] INFO com.myapp.ConsumerApp - ************ offset = 0, key = null, value = Some text to publish 21:37:33.313 [main] INFO com.myapp.ConsumerApp - ************ offset = 1, key = null, value = Some text to publish 21:41:01.351 [main] INFO com.myapp.ConsumerApp - ************ offset = 2, key = null, value = Some text to publish |
Go over the different menu items in the akhq.io UI at localhost:9099 and pay attention to the “offset“, “partition” & “Consumer Groups“.


