Introduction
This Article is about Programming Apache Kafka producer and consumer using Java language, as we’ll see, using Java we’ll be able to reproduce what the CLI does and even more.
Prerequisites
- Kafka Installation and configuration article ( To setup cluster will be used in this article)
- Any java programming editor Ex. (Netbeans – IntelliJ idea)
- Basic knowledge about java programming and Kafka
Creating Kafka project
Note: In this article, we assume that the editor is “Intellij IDEA”
- Open your Intellij IDEA
- Create a new “Maven” project with your preferred project name
- Open “Pom.xml” file in your project which contains all your dependencies in your code
- We need to add the Kafka dependencies
So, we will open <Dependencies> </ Dependencies> Tag to add our dependencies to it.
we will add two dependencies the first one is the Kafka dependency and the next one will be a logging dependency.
a. we will go to https://mvnrepository.com/ and search on apache Kafka to get “kafka-clients” Dependency as shown
and open it and select the latest version and copy our maven dependency to our dependencies tag.
b. on the same website we will search on “slf4j simple” and repeat the previous step and copy dependency then add it to dependencies tag what we did in the previous point.
4. our dependencies tag should be like that:
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependencies>
5. in Intellij editor toolbar click on View -> Tool Windows-> Maven and click on the refresh button to load our dependencies
6. Our dependences are loaded successfully let’s go doing some coding!
Creating java producer
- create your producer class with its main function.
- That’s all the imported libraries we will need
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
3. In our main function we will create our properties object to configure our producer properties
Properties properties = new Properties();
you will find out all the properties of the producer in kafka documentation Here
4. We will configure the required properties as shown
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. And it conncted on our zookeeper port “9092”
- key.serializer: Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
- value.serializer: Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
5. So, we will create our producer with our initialized properties
KafkaProducer<String,String> producer= new KafkaProducer<String,String>(properties);
6. Then we will create the record we will pass to our topic with the topic name we created already created
ProducerRecord<String,String> record = new ProducerRecord<String, String>("first_topic",”Hello World!”);
Note: We here sent a Hello World string to our topic named “First Topic”
7. Now, we will send our record to producer
producer.send(record);
and we need to wait until all data are produced so we will add using “flush” method
producer.flush();
then finally, close our producer
producer.close();
Create Java Consumer
- create your consumer class with its main function.
- That’s all the imported libraries we will need
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
3. In our main function we will create our properties object to configure our Consumer properties
Properties properties = new Properties();
you will find out all the properties of the Consumer in Kafka documentation Here.
4. We will configure the required properties as shown
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-fourth-application");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
- bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. And it conncted on our zookeeper port “9092”
- key.deserializer: Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface.
- value.deserializer: Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface.
- group.id: A unique string that identifies the consumer group this consumer belongs to.
- offset.reset: What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
- none: throw exception to the consumer if no previous offset is found for the consumer’s group
- anything else: throw exception to the consumer.
8. Now, we will create our Consumer with our initialized properties
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
9. We need to subscribe consumer to our topic
consumer.subscribe(Collections.singleton("first_topic"));
10. Here, we need to poll our data in real time we will use a simple while loop and the poll property We created our records and then poll the data to it the we will show it on the screen by the logger
while (true){
ConsumerRecords<String,String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,String> record : records){
logger.info(" value: "+ record.value());
}
}
11. Here we go, Run our consumer class and do a simple test by producing any data.
- Start you zookeeper and kafka servers from CLI
- Run your consumer Java Class to start consume new messages
- Run Your Producer java class to produce our “Hello_World!” Message.
- go back to our consumer and you will find our produced message
You can replace our example data here with any data from database table, file, or data from any other stream