Apache Kafka + Spark streaming + Apache Kafka Integration example

 239 total views,  1 views today

Welcome to real-time processing example series. In this tutorial, I will explain a simple integration example based on  Apache Kafka + Spark streaming + Apache Kafka Integration.

Kafka + Spark streaming + Kafka

What is Apache Kafka?

Apache Kafka is a distributed messaging system that is based on publishing and subscribing model. Apache Kafka is not only a messaging system but also a stream processor that can react to incoming messages(Thanks to Kafka streams API) and a fault-tolerant and reliable data store. Apache Kafka is mainly used in real-time data processing and analytics. It is efficient in handling huge volumes of data in real-time in a fault-tolerant way with high throughput. It considered that Apache Kafka is reliable and efficient when compared to other messaging systems

Recently, Apache Kafka has gained a lot of attention and momentum in big data space, especially real-time analytics. In many companies, they have started including Apache Kafka in their data pipelines as their messaging system. Although we do have other messaging systems such as ActiveMQ, RabbitMQ and TIBCO EMS, it is considered that Apache Kafka wins hands down in terms of High throughput, fault-tolerant, stream processor, data store and various features.

What are Apache Spark and Spark streaming?

Apache Spark is an open-source, high-speed, near real-time, fault-tolerant,in-memory cluster computing framework. It is widely adopted in many companies to implements batch and real-time analytics. Many were relying on Map-reduce for batch computation but due to its high latency and high process time, apache-spark has become a replacement. Apache spark is 100 times faster than Map-Reduce as per official documentation. The reason, it does its computation in-memory and the need for writing to disk is reduced. Hence low I/O operations when compared to Map-reduce and the effect is low latency and high throughput.

Apache Spark streaming is a sub-module of Apache spark for building real-time streaming applications.

Note, Apache Spark core module is for Batch processing and the Apache Spark streaming module is for Real-time processing.

Spark streaming is built on top of Spark SQL engine which means you execute SQL commands on top of your incoming data. Spark streaming is considered to be a fault-tolerant and scalable and real-time processing engine.

One thing to note is that, spark streaming processes the data in micro-batches. It is not a real-time actually but a near real-time but performance is guaranteed.

As per official documentation,

Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees

If you wanted to learn more about Apache spark, please check out my tutorial on Apache Spark – a complete guide.

Apache Kafka + Spark streaming + Apache Kafka Integration example

In this example, we will be having Apache Kafka as our steaming data source. We will be posting continuous messages in Apache Kafka’s topic named “source-topic”. Spark streaming will be our stream processing engine, which is listening to our Kafka “source-topic”.As soon as it receives a message, it will emit that message to another Kafka topic called “target-topic”.It is a simple integration example to understand the flow of events. Our main goal here is to see if the messages are consumed via spark streaming and pushed to the target topic. I assure you can easily understand the API used and how the event flows in our data pipeline. In another example, we will also see some aggregations on our incoming events.

  • Apache Kafka – Streaming data source
  • Apache Spark streaming – Stream processing engine
  • Apache Kafka – Data storage

Step 1: Prerequisite

  1. Java8+
  2. Ubuntu/Windows. I would be running the examples in my Ubuntu.
  3. Apache Kafka should be installed. Please find the guide/steps to Install Apache Kafka in your machine.
  4. Apache Spark should be installed.  Please find the guide/steps to Install Apache Spark in your machine.

Step 2: Create “source-topic” in Apache Kafka

  1. Make sure Apache Zookeeper is up and running.
  2. Apache Kafka broker is up and running.
  3. Command to create a topic.
//List topics

bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic source-topic

Created topic source-topic.

Step 3: Create “target-topic” in Apache Kafka

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic target-topic

Created topic target-topic.

Step 4: List topics

bin/kafka-topics.sh --list --zookeeper localhost:2181

source-topic
target-topic

So you are now ready with Source and target topics.

Step 5: Spark Streaming code

Initialize Spark Session.

val spark = SparkSession.builder().appName("Kafka-SparkStreaming-Kafka").master("local[*]").getOrCreate()

Read from Kafka source-topic.

val df = spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "localhost:9092")
     .option("subscribe", "source-topic")
     .load()

Write to Kafka target-topic

df
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "target-topic")
  .option("checkpointLocation", "/home/rajasekar.sribalan/tmp/")
  .start().awaitTermination();

Complete code

package realtime.dataprocessing.sparkstreaming

import org.apache.spark.sql.SparkSession

object KafkaSparkStreamingKafka {
  def main(args: Array[String]): Unit = {

    //Spark Session
    val spark = SparkSession.builder().appName("Kafka-SparkStreaming-Kafka").master("local[*]").getOrCreate()

    //Read Stream
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "source-topic")
      .load();

    //WriteStream
    df
      .writeStream // use `write` for batch, like DataFrame
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "target-topic")
      .option("checkpointLocation", "/home/rajasekar.sribalan/tmp/")
      .start().awaitTermination();
  }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>realtime.dataprocessing</groupId>
 <artifactId>sparkstreaming</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <packaging>jar</packaging>

 <name>sparkstreaming</name>
 <url>http://maven.apache.org</url>

 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 </properties>

 <dependencies>

  <dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql_2.11</artifactId>
   <version>2.3.4</version>
  </dependency>

  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>0.10.0.1</version>
  </dependency>

  </dependencies>

 <build>
  <plugins>
   <plugin>
    <groupId>org.scala-tools</groupId>
    <artifactId>maven-scala-plugin</artifactId>
    <version>2.11</version>
    <executions>
     <execution>
      <goals>
       <goal>compile</goal>
       <goal>testCompile</goal>
      </goals>
     </execution>
    </executions>

   </plugin>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>2.6</version>
    <configuration>
     <descriptorRefs>
      <descriptorRef>jar-with-dependencies</descriptorRef>
     </descriptorRefs>
     <archive>
      <manifest>
       <mainClass>realtime.dataprocessing.sparkstreaming.KafkaSparkStreamingKafka</mainClass>
      </manifest>
     </archive>
    </configuration>
    <executions>
     <execution>
      <phase>package</phase>
      <goals>
       <goal>single</goal>
      </goals>
     </execution>
    </executions>
   </plugin>
  </plugins>
 </build>
</project>

 

Step 6: Run the Spark streaming code

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4 --class realtime.dataprocessing.sparkstreaming.​​KafkaSparkStreamingKafka --master local[*] --executor-memory 2G --num-executors 4  sparkstreaming-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Step 7: Start Kafka Producer

Now your spark streaming is up and running, it is now time to start posting messages to “source-topic” in Kafka. To perform that, we need to start the Kafka producer.

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source-topic

Now I am posting a few messages to the topic.

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source-topic

>Hi
>Hi
>Raj
>Sekar
>How
>are
>you
>I am testing you
>hope you are processing

So we have now posted messages to source-topic. Please check your spark job console for any errors. By this time your messages would have been processed by spark streaming job and it would be written to “target-topic”.Let us now check “target-topic”.

Step 8: Verify results in Kafka target-topic(Integration test)

Let us now verify the results by consuming the messages from “target-topic” via Kafka Consumer.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic target-topic --from-beginning
Hi
Hi
Raj
Sekar
How
are
you
I am testing you
hope you are processing

If you see, messages are available in target-topic.

Thus our Spark Streaming and Kafka integration example have worked successfully. It is a simple example to make you understand the  Apache Kafka + Spark streaming + Apache Kafka Integration.

In the subsequent tutorials, I  will post some examples with aggregation. Stay tuned!.

I hope this tutorial was useful. Please provide your feedback and leave us a thumbs up!.

References: Spark Official documentation.

About the Author

Rajasekar

Hey There, My name is Rajasekar and I am the author of this site. I hope you are liking my tutorials and references. Programming and learning new technologies are my passion. The ultimate idea of this site is to share my knowledge(I am still a learner :)) and help you out!. Please spread your words about us (staticreference.com) and give a thumbs up :) Feel free to contact me for any queries!.