-
Notifications
You must be signed in to change notification settings - Fork 2
/
streaming.scala
27 lines (23 loc) · 1.01 KB
/
streaming.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark._
import org.apache.spark.streaming._
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "10.30.3.41:9092,10.30.3.42:9092,10.30.3.43:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "123",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = sc.getConf.get("spark.kafka.streaming.topics").split(",")
val ssc = new StreamingContext(sc,Seconds(10))
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val myStream = stream.map(record => (record.key, record.value))