This practice uses kafka console As a producer of messages ,Spark Streaming As consumers of information , The specific practice code is as follows

First start kafka server

.\bin\windows\kafka-server-start.bat .\config\

Create a Topic

here topic Named after test For example

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Create a producer

kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

Create a Consumer

package spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._ object SparkStreamingKakfaWordCount {
def main(args: Array[String]) {
println("Start to run SparkStreamingKakfaWordCount")
val conf = new SparkConf().setMaster("local[3]")setAppName("SparkStreamingKakfaWordCount")
val ssc = new StreamingContext(conf, Seconds(4))
val topicMap=Map("test" -> 1)
// zookeeper quorums server list
val zkQuorum = "localhost:2181";
// consumer group
val group = "test-consumer-group01"
// The following treatment assumes topic test There's only one division
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.print() val words = lines.flatMap(_.split(" "))
val wordCounts = => (x,1L)).reduceByKey(_+_)
// The following treatment assumes topic test Yes 2 Zones ,spark streaming establish 2 individual Input DStream, Read in parallel 2 Zones
// Spark Streaming take RDD Repartitioned to 4 individual RDD, Parallel processing , The parallelism of processing logic is the degree of reading parallelism 2 times
// val streams = (1 to 2).map( _ => KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)) // take 2 individual stream Conduct union
// val partitions = ssc.union(streams).repartition(4).map("DataReceived: " + _)
// partitions.print()
// val partitions = ssc.union(streams).repartition(2) //partition The number is based on spark It depends on the ability of parallel processing
// val words = partitions.flatMap(_.split(" "))
// val wordCounts = => (x,1L)).reduceByKey(_+_)
// wordCounts.print() ssc.start() //Start the computation
ssc.awaitTermination() //Wait for the computation to termination
} }

Spark Streaming And kafka Integrating practice WordCount More articles about

  1. 【 turn 】Spark Streaming and Kafka Integrated development guide

    be based on Receivers Methods This method is used Receivers To receive data .Receivers The implementation of uses Kafka High level consumers API. For all Receivers, The received data will be saved in Spark ...

  2. Spark Streaming and Kafka Integrated development guide ( Two )

    In this blog <Spark Streaming and Kafka Integrated development guide ( One )> This article describes how to use Receiver The method of using Spark Streaming from Kafka Data received in . This article will introduce ...

  3. Spark Streaming and Kafka Integrated development guide ( One )

    Apache Kafka It's a distributed news release - Subscription system . so to speak , Any real-time big data processing tool is lack of Kafka Integration is incomplete . This article will show you how to use Spark Streaming from Kafka Data received in , There will be ...

  4. Spark Streaming and Kafka Consolidation ensures zero data loss

    When we are properly deployed Spark Streaming, We can use Spark Streaming Provide zero data loss mechanism . To experience this key feature , You need to meet the following prerequisites : 1. The input data comes from reliable data sources ...

  5. Spark Streaming and Kafka How consolidation ensures zero data loss

    Reprint : When we are properly deployed Spark Streaming, We can use Spark Streaming Zero data loss provided ...

  6. demo1 spark streaming receive kafka data java Code WordCount Example

    1. First start zookeeper windows For the installation of the zk 02 And Windows Installation and use zookeeper See you after starting : 2. start-up kafka windows Installation kafka see Windows Go up ...

  7. spark streaming receive kafka data java Code WordCount Example

  8. spark streaming Integrate kafka

    Kakfa At first it was by LinkedIn A distributed message system developed by the company , And then become Apache Part of , It USES Scala To write , It is widely used for horizontal scalability and high throughput . At present, more and more open source distributed processing systems such as Clouder ...

  9. spark streaming be based on Kafka Development of

    spark streaming Use Kafka Data source for data processing , This article focuses on the practical use of . One . be based on receiver The way In the use of receiver When , If receiver and partition Misallocation , ...

Random recommendation

  1. OSG The model is simple to control

    OSG The model is simple to control from : Basic operation of node Add node OSG Use in osg::Node and osg ...

  2. malloc、calloc、realloc The difference between

    (1)C Language and memory allocation <1> Allocate from static storage area .       Memory is allocated when the program is compiled , The whole running period of the program in this area exists . For example, global variables .static Variable .<2> ...

  3. css The box

    <html><head lang="en"> <meta charset="UTF-8"> <title>< ...

  4. cron Expression learning

    One .Cron It's about planning tasks ( Timing task ) Two .Cron expression Cron An expression is a string , It is divided into 6 or 7 Domains , Each field is separated by a space .Cron There are two grammatical formats as follows : (1) Seconds Minutes Hours ...

  5. Jenkins The remote build

    First, in the Jenkins On the configuration Job: In the form of a picture , Just go to the corresponding url Send the request Shell as follows : import json import urllib.parse,http.client def po ...

  6. Network programming concurrency Multi process The process of pool , The mutex , Semaphore ,IO Model

    process : The process that the program is executing , It's a task in progress , And the one who's in charge of the task is cpu operating system : The operating system is a coordination . A control program that manages and controls computer hardware and software resources . The role of the operating system : 1: Hide the ugly and complicated hardware connection ...

  7. create react app Problems encountered

    What I'm thinking now is static Resources and dynamics api Let's deal with it separately , static Resources open nginx The server ,api Request by the express complete , The problem now is when it comes to development proxy Set all requests ...

  8. [ National Team 2012]middle Two points answer x Put in the interval >=x The number of is set to 1,<x The number of is set to -1 The left end is [a,b] Between ...

  9. 【Unity】7.5 Mobile device input

    classification :Unity.C#.VS2015 Date of creation :2016-04-21 One . brief introduction stay iOS and Android In the system , It's all done by touch .Input The method or variable of touch operation in class is shown in the figure below : adopt GetT ...

  10. Linux Under the CD image generation and recording

    mkiosfs The order is as follows /root/ There's a file below file1 file2 file3maiosfs -o img.ios file1 file2 file3 The order will file1 file2 file3 Put in im ...