➤ How to Code a Game
➤ Array Programs in Java
➤ Java Inline Thread Creation
➤ Java Custom Exception
➤ Hibernate vs JDBC
➤ Object Relational Mapping
➤ Check Oracle DB Size
➤ Check Oracle DB Version
➤ Generation of Computers
➤ XML Pros & Cons
➤ Git Analytics & Its Uses
➤ Top Skills for Cloud Professional
➤ How to Hire Best Candidates
➤ Scrum Master Roles & Work
➤ CyberSecurity in Python
➤ Protect from Cyber-Attack
➤ Solve App Development Challenges
➤ Top Chrome Extensions for Twitch Users
➤ Mistakes That Can Ruin Your Test Metric Program
Spring Boot Integration with Apache Kafka | Let us an application example for Spring Boot integration with Apache Kafka.
Apache Kafka – API Details
Apache has provided one API to work with Kafka EcoSystem i.e. known as ‘Apache Kafka API — [org.apache.kafka]’.

ProducerRecord<K, V>
: It is a combination object which is created by Apache Kafka that stores TopicName, and data after Serialization.ConsumerRecord<K, V>
: One Message Broker reads data from the topicName given which gets cloned and sends data using a combination object, but this time data must be Deserialized.StringSerializer
: To convert our Message to ProducerRecord format data must be Serialized, which can be done using StringSerializer.StringDeserializer
: To convert back Serialized data to Object format use StringDeserializer.- Properties for connection:-
- bootstrap-server location = localhost:9092
- zookeeper server = localhost:2181
- Serializer[for producer] =
org.apache.kafka.common.serialization.StringSerializer
- Deserializer[for Consumer] =
org.apache.kafka.common.serialization.StringDeserializer

KafkaTemplate<K,V>
:- This class is used to send data in key=val format which internally createsProducerRecord<K,V>
and sends to Kafka software. K=TopicName, V= Data which is Serialized and transferred.@KafkaListener (topicName)
: It reads data from Kafka software which comes in ConsumerRecord<K, V>. Data is given in Deserialized format.
The groupId
is used for multiple consumers to replicate data. We can provide a replication factor while creating topicName, or else use groupId.
Spring Boot Integration with Apache Kafka Application Example
- Create a spring starter project “SpringBootKafkaEx” with the following dependencies:- Spring Boot DevTools, Lombok, MySQL Driver, Spring Data JPA, Spring Web, and Spring for Apache Kafka.
- Model class:-
package com.knowprogram.demo.model;
@Entity
@Table(name = "stock_quota_tab")
@Data
public class StockQuote {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
@Column(name = "stk_code_col")
private String stockCode;
@Column(name = "stk_cost_col")
private Double shareValue;
@Column(name = "stk_dte_col")
@Temporal(TemporalType.TIMESTAMP)
private Date serviceDate;
}
- Repository:-
package com.knowprogram.demo.repository;
public interface StockQuoteRepository extends JpaRepository<StockQuote, Integer> { }
- MessageStoreService:-
package com.knowprogram.demo.service;
@Service
public class MessageStoreService {
@Autowired
private StockQuoteRepository repository;
public void addStockData(StockQuote quote) {
repository.save(quote);
}
public List<StockQuote> getAllStockQuotes() {
return repository.findAll();
}
}
- Util to convert Object to JSON and String to Object.
package com.knowprogram.demo.util;
@Component
public class JsonUtil {
public String toJson(StockQuote stockQuote) {
try {
return new ObjectMapper().writeValueAsString(stockQuote);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
public StockQuote fromJson(String json) {
try {
return new ObjectMapper().readValue(json, StockQuote.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
}
- Producer service:-
package com.knowprogram.demo.producer;
@Component
public class ProducerService {
private static final Logger LOG = LoggerFactory.getLogger(ProducerService.class);
@Value("${my.app.topicName}")
private String topic;
@Autowired
private KafkaTemplate<String, String> template;
@Autowired
private JsonUtil util;
public void sendData(StockQuote quote) {
String message = util.toJson(quote);
LOG.info("At producer received {}", message);
template.send(topic, message);
}
}
- Consumer service:-
package com.knowprogram.demo.consumer;
@Component
public class ConsumerService {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerService.class);
@Autowired
private MessageStoreService service;
@Autowired
private JsonUtil util;
@KafkaListener(topics = "${my.app.topicName}", groupId = "groupId")
public void readData(String message) {
LOG.info("Data at consumer: {}", message);
StockQuote quote = util.fromJson(message);
quote.setServiceDate(new Date());
service.addStockData(quote);
}
}
- RestController:-
package com.knowprogram.demo.rest;
@RestController
@RequestMapping("/api/quote")
public class KafkaRestController {
@Autowired
private ProducerService producerService;
@Autowired
private MessageStoreService storeService;
// 1. send StockQuote
@PostMapping("/create")
public ResponseEntity<String> createStockQuote(@RequestBody StockQuote quote) {
producerService.sendData(quote);
return ResponseEntity.ok("Quote Data is sent!");
}
// 2. View all received
@GetMapping("/all")
public ResponseEntity<List<StockQuote>> getAllQuotes() {
List<StockQuote> list = storeService.getAllStockQuotes();
return ResponseEntity.ok(list);
}
}
- In application.properties:-
spring.application.name=SpringBootKafkaEx
# server details
server.port=8761
# database details
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
# JPA details
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.format-sql=true
# Kafka details
## producer
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
## consumer
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
## topic name
my.app.topicName = sample-topic-knowprogram
Kafka topic names must adhere to specific rules:-
- Length: The topic name must be between 1 and 249 characters.
- Valid Characters: Only letters, digits, hyphens (-), underscores (_), and periods (.) are allowed.
- No Spaces: Topic names cannot contain spaces.
- Restricted Names: Certain names may be restricted or reserved by Kafka.
In the properties topicName must not contain space/leading or trailing spaces else you will get the following error while starting the application:- org.apache.kafka.common.errors.InvalidTopicException: Invalid topics:
We should trim any leading or trailing spaces from the topic name and ensure it meets the valid criteria.
Execution Steps
Go to the kafka home folder and use the below command to start the zookeeper, and kafka server.
- Start Zookeer.
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
- Start Kafka server.
.\bin\windows\kafka-server-start.bat .\config\server.properties
- Run SpringBootKafkaEx application
- Send data using Postman.
POST http://localhost:8761/api/quote/create
BODY:-{"stockCode": "KP-TECH LTD", "shareValue": 100}
- Check database table
use test select * from stock_quota_tab;
- Check console.
c.k.demo.producer.ProducerService : At producer received {"id":null,"stockCode":"KP-TECH LTD","shareValue":100.0,"serviceDate":null}
c.k.demo.consumer.ConsumerService : Data at consumer: {"id":null,"stockCode":"KP-TECH LTD","shareValue":100.0,"serviceDate":null}
In real-time, in place of Postman, some other application will be sending continuous data.
Understanding the Project
- It is a must to set up ZooKeeper for Kafka. ZooKeeper performs many tasks for Kafka but in short, we can say that ZooKeeper manages the Kafka cluster state.
- By default, the Kafka server is started on port 9092. Kafka uses ZooKeeper, and hence a Zookeeper server is also started on port 2181.
- What is group Id: Consumer groups give Kafka the flexibility to have the advantages of both message queuing and publish-subscribe models.
- Kafka consumers belonging to the same consumer group share a group ID. The consumers in a group then divide the topic partitions as fairly amongst themselves as possible by establishing that each partition is only consumed by a single consumer from the group.
- If all consumers are from the same group, the Kafka model functions as a traditional message queue would. All the records and processing are then load balanced Each message would be consumed by one consumer of the group only. Each partition is connected to at most one consumer from a group.
- KafkaTemplate helps us to send messages to their respective topic.
- All we need to do is to call the sendMessage() method with the message and the topic name as parameters.
- When multiple consumer groups exist, the flow of the data consumption model aligns with the traditional publish-subscribe model. The messages are broadcast to all consumer groups.
- In order to prepare the message for transmission from the producer to the broker, we use serializers. The key. serializer and value. Serializer instructs how to turn the key and value objects the user provides with their ProducerRecord into bytes. You can use the included ByteArraySerializer or StringSerializer for simple string or byte types.
In consumer, for method parameter instead of String it can have ConsumerRecord<String, String> as follows:-
@KafkaListener(topics = "${kafka.topic.name}",
groupId= "${spring.kafka.consumer.group-id}"
)
public void listen (ConsumerRecord<String, String> record) {
System.out.println("Received Message: " + record.value());
}
If you enjoyed this post, share it with your friends. Do you want to share more information about the topic discussed above or do you find anything incorrect? Let us know in the comments. Thank you!