Apache Kafka is a scalable, high performance, low latency platform that allows reading and writing streams of data like a messaging system. We can start writing Kafka applications using Java fairly easily, check our previous article on how to design a Kafka pipeline in Java.
If you research the variety of real-world use-cases for Kafka, you will very likely see a large number of use cases that have a data pipeline setup where Apache Spark Streaming is receiving messages from Kafka. That’s why in this article we will see together how we can integrate the two technologies, to use the power of Apache Spark processing with Kafka.
Kafka and Spark Integration
If you wanted to configure Spark Streaming to receive data from Kafka, Starting from Spark 1.3, the new Direct API approach was introduced. This new receiver-less “direct” approach has been introduced to ensure stronger end-to-end guarantees. Instead of using receivers to receive data as done on the prior approach.
This new direct approach periodically queries Kafka for the latest offsets in each topic and partition and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka’s simple consumer API, which you may recall can manually handle offsets, is used to read the defined ranges of offsets from Kafka.
Note: There is another useful way to interact with Kafka from Spark. You can use the new KafkaUtils.createRDD method, to read data from Kafka in a non-streaming Spark job. This can be used to run batch jobs on Kafka data from Spark.
Spark Streaming and Kafka Direct Approach
Following is the process which explains the direct approach integration between Apache Spark and Kafka
- Spark periodically queries Kafka to get the latest offsets in each topic and partition that it is interested in consuming from.
- At the beginning of every batch interval, the range of offsets to consume is decided.
- Spark then runs jobs to read the Kafka data that corresponds to the offset ranges determined in the prior step.
- The Kafka simple consumer API is used to read the messages within the defined ranges of offsets.
- The consumed data can then be processed in Spark.
- The offsets are also saved reliably with checkpoints and could be used to re-compute the data in case of a failure.
Spark/Kafka Direct Approach Benefits
- Simplified Parallelism: Spark Streaming creates as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel By having this one-to-one mapping between Kafka and Spark RDD partitions, it is easier to understand and work with the system.
- Efficiency: It removes the need to write-ahead logs to achieve zero-data loss.
- The direct approach uses the Kafka simple consumer client, which gives the control needed to manually track offsets within Spark Streaming checkpoints. This makes it possible to have exactly-once semantics despite failures.
Reading from Kafka with Spark Streaming (Example)
Let’s take a look at how we can write code that allows Spark Streaming to read data from Kafka.
We first need to import the necessary dependencies from https://maven.apache.org/
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.0.0</version> </dependency> </dependencies>
Then need to import the necessary libraries from
org.apache.spark.streaming.kafka, and the necessary libraries for your application.
And now we will start our configurations:
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Spark streaming with kafka ");
Then we set the batch interval each 5 seconds
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
Kafka parameters (properties) configurations:
final Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaParams.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaParams.put("group.id", "DEFAULT_GROUP_ID"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false);
And we will set the already created topic or topics name
Collection<String> topics = Arrays.asList("kafkaSpark");
Once we have all of that code in place, we use KafkaUtils.createDirectStream to create our stream to subscribe to our Kafka topics
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) );
The call to createDirectStream returns a DStream of tuples that are created from each Kafka message’s key and value.
It is also possible to work with the offsets there are other forms of the createDirectStream method that allow us to pass in a parameter that contains Kafka offsets defining the starting point of the stream. This makes it so you can begin consuming records from any arbitrary offset.
Now your Input DStream which called “Stream” is getting your data from the Kafka topic so you can make your processes using spark functions ex: (Map, Flatmap, mapToPair, reduceByKey .. etc)