Kafka Integration

Steps to Download and install kafka

Download latest version of kafka from kafka.apache.org

Unzip the downloaded file you’ll find the following folders inside:

Download and install offset-explorer for better view of kafka msgs, partitions and more. Download from site: www.kafkatool.com

Run Kafka, Zookeeper and Offset

Go inside bin\windows directory where you can find the kafka and zookeeper batch files

Inside config directory you can find the configs for zookeeper and kafka server

Start zookeeper using command:

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

Now lets start kafka server using command:

.\bin\windows\kafka-server-start.bat .\config\server.properties

Create topic in kafka server using command:

.\bin\windows\kafka-topics.bat –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 3 –topic bits-to-bytes-topic

List all topics:

.\bin\windows\kafka-topics.bat –list –bootstrap-server localhost:9092

Add Cluster in Kafka Offset

Add the cluster

Kafka Producer Consumer

start producer using command

.\bin\windows\kafka-console-producer.bat –broker-list localhost:9092 –topic bits-to-bytes-topic

Start consumer in a new window using command

.\bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic bits-to-bytes-topic –from-beginning

now send a message from producer. Consumer should receive the same message.

Lets see in offset for better view:

If you are not able to see the msgs or if the value is in bytes or not string, configure like this:

Kafka Spring Integration

Create a new project with following dependencies:

Enable Kafka Annotation

Define Kafka Producer properties:

Producer send Config:

Rest API to send message:

Test the producer:

Start API and send a message by calling the API we have exposed:

We can see message sent message in logs:

Lets see in our consumer from cmd:

KAFKA CONSUMER

Now lets Configure Consumer in our application:

Lets send another message by calling our exposed endpoint and see if our consumer has received in logs.

Logs:

Send serialized Objects

Lets modify our existing consumer to consume objects.

And lets modify our rest endpoint to send Order Object once the api is called.

Lets test this. Call the rest api, it should generate the Order Object and send it to kafka.

Logs show that the Order object was consumed successfully.

Lets see in our Offset UI:

Kafka Configs

Compress kafka message, set linger.ms size, batch.size

linger.ms refers to the time to wait before sending messages out to Kafka. It defaults to 0, which the system interprets as ‘send messages as soon as they are ready to be sent. batch.size refers to the maximum amount of data to be collected before sending the batch. Kafka producers will send out the next batch of messages whenever linger.ms or batch.size is met first.

Logs before setting the config values:

Logs after setting the config values:

By default the idempotence value is set as true. We can modify the value to false if needed:

Logs after setting the idempotence as false:

kafka integration kafka integration kafka integration kafka integration kafka integration

Leave a Reply

Your email address will not be published. Required fields are marked *