Kafka Integration
Steps to Download and install kafka
Download latest version of kafka from kafka.apache.org
Unzip the downloaded file you’ll find the following folders inside:
Download and install offset-explorer for better view of kafka msgs, partitions and more. Download from site: www.kafkatool.com
Run Kafka, Zookeeper and Offset
Go inside bin\windows directory where you can find the kafka and zookeeper batch files
Inside config directory you can find the configs for zookeeper and kafka server
Start zookeeper using command:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
Now lets start kafka server using command:
.\bin\windows\kafka-server-start.bat .\config\server.properties
Create topic in kafka server using command:
.\bin\windows\kafka-topics.bat –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 3 –topic bits-to-bytes-topic
List all topics:
.\bin\windows\kafka-topics.bat –list –bootstrap-server localhost:9092
Add Cluster in Kafka Offset
Add the cluster
Kafka Producer Consumer
start producer using command
.\bin\windows\kafka-console-producer.bat –broker-list localhost:9092 –topic bits-to-bytes-topic
Start consumer in a new window using command
.\bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic bits-to-bytes-topic –from-beginning
now send a message from producer. Consumer should receive the same message.
Lets see in offset for better view:
If you are not able to see the msgs or if the value is in bytes or not string, configure like this:
Kafka Spring Integration
Create a new project with following dependencies:
Enable Kafka Annotation
Define Kafka Producer properties:
Producer send Config:
Rest API to send message:
Test the producer:
Start API and send a message by calling the API we have exposed:
We can see message sent message in logs:
Lets see in our consumer from cmd:
KAFKA CONSUMER
Now lets Configure Consumer in our application:
Lets send another message by calling our exposed endpoint and see if our consumer has received in logs.
Logs:
Send serialized Objects
Lets modify our existing consumer to consume objects.
And lets modify our rest endpoint to send Order Object once the api is called.
Lets test this. Call the rest api, it should generate the Order Object and send it to kafka.
Logs show that the Order object was consumed successfully.
Lets see in our Offset UI:
Kafka Configs
Compress kafka message, set linger.ms size, batch.size
linger.ms refers to the time to wait before sending messages out to Kafka. It defaults to 0, which the system interprets as ‘send messages as soon as they are ready to be sent. batch.size refers to the maximum amount of data to be collected before sending the batch. Kafka producers will send out the next batch of messages whenever linger.ms or batch.size is met first.
Logs before setting the config values:
Logs after setting the config values:
By default the idempotence value is set as true. We can modify the value to false if needed:
Logs after setting the idempotence as false: