Skip to content

Latest commit

 

History

History
109 lines (82 loc) · 4.56 KB

README.md

File metadata and controls

109 lines (82 loc) · 4.56 KB

Kinesis Connector for Structured Streaming

Implementation of Kinesis Source Provider in Spark Structured Streaming. SPARK-18165 describes the need for such implementation.

Developer Setup

git clone [email protected]:qubole/kinesis-sql.git
cd kinesis-sql
mvn install -DskipTests

This will create target/spark-sql-kinesis_2.11-2.2.0.jar file which contains the connector code and its dependency jars.

How to use it

Setup Kinesis

Refer Amazon Docs for more options

Create Kinesis Stream
$ aws kinesis create-stream --stream-name test --shard-count 2
Add Records in the stream
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Kinesis'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Connector'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'for'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Apache'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Spark'

Example Streaming Job

Refering $SPARK_HOME to the Spark installation directory. This library has been developed and tested against SPARK 2.2.x.

Open Spark-Shell
$SPARK_HOME/bin/spark-shell --jars target/spark-sql-kinesis_2.11-2.2.0.jar
Subscribe to Kinesis Source
// Subscribe the "test" stream
scala> :paste
val kinesis = spark
	.readStream
	.format("kinesis")
	.option("streamName", "spark-streaming-example")
   	.option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
    .option("awsAccessKeyId", [ACCESS_KEY])
    .option("awsSecretKey", [SECRET_KEY])
    .option("startingposition", "TRIM_HORIZON")
	.load
Check Schema
scala> kinesis.printSchema
root
|-- data: binary (nullable = true)
|-- streamName: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- sequenceNumber: string (nullable = true)
|-- approximateArrivalTimestamp: timestamp (nullable = true)
Word Count
// Cast data into string and group by data column
scala> :paste

	 kinesis
    .selectExpr("CAST(data AS STRING)").as[(String)]
    .groupBy("data").count()
	.writeStream
	.format("console")
    .outputMode("complete") 
	.start()
	.awaitTermination()
Output in Console
+------------+-----+
|        data|count|
+------------+-----+
|         for|    1|
|      Apache|    1|
|       Spark|    1|
|     Kinesis|    1|
|   Connector|    1|
+------------+-----+ 

Kinesis Source Configuration

Option-Name Default-Value Description
streamName - Name of the stream in Kinesis to read from
endpointUrl https://kinesis.us-east-1.amazonaws.com end-point URL for Kinesis Stream
awsAccessKeyId - AWS Credentials for Kinesis describe, read record operations
awsSecretKey - AWS Credentials for Kinesis describe, read record
startingPosition LATEST Starting Position in Kinesis to fetch data from. Possible values are "LATEST" & "TRIM_HORIZON"
describeShardInterval 1s (1 second) Minimum Interval between two DescribeStream API calls to consider resharding
kinesis.executor.maxFetchTimeInMs 1000 Maximum time spent in executor to fetch record from Kinesis per Shard
kinesis.executor.maxFetchRecordsPerShard 100000 Maximum Number of records to fetch per shard
kinesis.executor.maxRecordPerRead 10000 Maximum Number of records to fetch per getRecords API call
kinesis.client.numRetries 3 Maximum Number of retries for Kinesis API requests
kinesis.client.retryIntervalMs 1000 Cool-off period before retrying Kinesis API

Roadmap

  • Above library has been developed and tested against Spark 2.2.x. We need to migrate to DataSource V2 APIs released in Spark 2.3.0
  • Support for Kinesis Sink

Acknowledgement

This connector would not have been possible without reference implemetation of Kafka connector for Structured streaming, Kinesis Connector for Legacy Streaming and Kinesis Client Library. Structure of some part of the code is influenced by the excellent work done by various Apache Spark Contributors.