Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Tuesday 16 May 2017

Spark : Spark streaming and Kafka Integration

steps:

 1)  start zookeper server
 2)  Start Kafka brokers [ one or more ]
 3)  create topic .
 4)  start console producer [ to write messages into topic ]
 5) start console consumer [ to test , whether messages are stremed ]
 6) create spark streaming context,
    which streams from kafka topic.
 7) perform transformations or aggregations
 8) output operation : which will direct the results into another kafka topic.
------------------------------------------





   

 


following code tested with ,
  spark 1.6.0 and kafka 0.10.2.0

kafka and spark streaming

bin/zookeeper-server-start.sh  config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka.KafkaUtils
//1.
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
val lines = kafkaStream.map(x => x._2.toUpperCase)

val warr = lines.map(x => x.split(" "))
val pair = warr.map(x => (x,1))
val wc = pair.reduceByKey(_+_)

wc.print()
// use below code to write results into kafka topic
ssc.start

------------------------------
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1

// writing into kafka topic.

import org.apache.kafka.clients.producer.ProducerConfig
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord


wc.foreachRDD(rdd =>
      rdd.foreachPartition(partition =>

                partition.foreach{
                  case t:(w:String,cnt:Long)=>{
                    val x = w+"\t"+cnt                
                    val props = new HashMap[String, Object]()
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            "localhost:9092")
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")

                    println(x)
                    val producer = new KafkaProducer[String,String](props)
                    val message=new ProducerRecord[String, String]("results1",null,x)
                    producer.send(message)
                  }
                }))

-- execute above code before ssc.start.
--------------------------------------------
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning





-------------------
 val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))

1. --? KafkaUtils.createStream()..
   needs 4 arguments.
    1st --->  streaming Context
    2nd --> zk details.
   3rd --- > consumer group id
   4th ----> Topics.
 
spark streaming can read from multiple topics.
    topic should be as  a key value pair of map object

key ---> topic name
value ---> no.of consumer threads.

 to read from multiple topics,
 the 4th argument should be as follows.
    Map("t1"->2,"t2"->4,"t3"->1)

-------------------------

   each given number of consumer threads will applied on each partition of kafka topic.

   ex: topic has 3 threads,
        consumber threads are 5.
   so , total number of threads = 15.

but these  15 theads are not parallely executed.

at shot, 5 threads for one partiton will be parallely consuming data.

to make all (15) parallel.

val numparts = 3
val kstreams = (1 to numparts).map{x =>
    val kafkaStream = KafkaUtils.createStream(ssc,   "localhost:2181","spark-streaming-consumer-   group",     Map("spark-topic" -> 5))
          }


















1 comment: