This practice uses kafka console As a producer of messages ,Spark Streaming As consumers of information , The specific practice code is as follows

First start kafka server

.\bin\windows\kafka-server-start.bat .\config\

Create a Topic

here topic Named after test For example

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Create a producer

kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

Create a Consumer

package spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._ object SparkStreamingKakfaWordCount {
def main(args: Array[String]) {
println("Start to run SparkStreamingKakfaWordCount")
val conf = new SparkConf().setMaster("local[3]")setAppName("SparkStreamingKakfaWordCount")
val ssc = new StreamingContext(conf, Seconds(4))
val topicMap=Map("test" -> 1)
// zookeeper quorums server list
val zkQuorum = "localhost:2181";
// consumer group
val group = "test-consumer-group01"
// The following treatment assumes topic test There's only one division
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.print() val words = lines.flatMap(_.split(" "))
val wordCounts = => (x,1L)).reduceByKey(_+_)
// The following treatment assumes topic test Yes 2 Zones ,spark streaming establish 2 individual Input DStream, Read in parallel 2 Zones
// Spark Streaming take RDD Repartitioned to 4 individual RDD, Parallel processing , The parallelism of processing logic is the degree of reading parallelism 2 times
// val streams = (1 to 2).map( _ => KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)) // take 2 individual stream Conduct union
// val partitions = ssc.union(streams).repartition(4).map("DataReceived: " + _)
// partitions.print()
// val partitions = ssc.union(streams).repartition(2) //partition The number is based on spark It depends on the ability of parallel processing
// val words = partitions.flatMap(_.split(" "))
// val wordCounts = => (x,1L)).reduceByKey(_+_)
// wordCounts.print() ssc.start() //Start the computation
ssc.awaitTermination() //Wait for the computation to termination
} }

