From 379e80d55a4c4feb19674215ea2de41c082030d0 Mon Sep 17 00:00:00 2001 From: Yamin Lin Date: Mon, 6 May 2024 19:13:58 -0700 Subject: [PATCH] add spark seesion and data --- docs/PySpark/structured-streaming-pyspark.md | 40 ++++++++++++++ ...uctured-streaming-eventhubs-integration.md | 54 +++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/docs/PySpark/structured-streaming-pyspark.md b/docs/PySpark/structured-streaming-pyspark.md index 39088b669..b981d1cb1 100644 --- a/docs/PySpark/structured-streaming-pyspark.md +++ b/docs/PySpark/structured-streaming-pyspark.md @@ -237,6 +237,11 @@ ehConf = { 'eventhubs.connectionString' : connectionString } +# build a spark session +spark = SparkSession.builder \ + .appName("EventHubWriter") \ + .getOrCreate() + df = spark \ .readStream \ .format("eventhubs") \ @@ -253,6 +258,11 @@ ehConf = { 'eventhubs.connectionString' : connectionString } +# build a spark session +spark = SparkSession.builder \ + .appName("EventHubWriter") \ + .getOrCreate() + # Simple batch query df = spark \ .read \ @@ -310,6 +320,21 @@ ehWriteConf = { 'eventhubs.connectionString' : writeConnectionString } +# build a spark session +spark = SparkSession.builder \ + .appName("EventHubWriter") \ + .getOrCreate() + +# create sample data +columns = ["body", "num"] +data = [("hello", "10"), ("azure", "10"), ("data", "10")] + +for a in range(50001): + data.append(("hello", "2")) + +rdd = spark.sparkContext.parallelize(data) +df = spark.createDataFrame(rdd, columns) + # Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model. ds = df \ .select("body") \ @@ -338,6 +363,21 @@ ehWriteConf = { 'eventhubs.connectionString' : writeConnectionString } +# build a spark session +spark = SparkSession.builder \ + .appName("EventHubWriter") \ + .getOrCreate() + +# create sample data +columns = ["body", "num"] +data = [("hello", "10"), ("azure", "10"), ("data", "10")] + +for a in range(50001): + data.append(("hello", "2")) + +rdd = spark.sparkContext.parallelize(data) +df = spark.createDataFrame(rdd, columns) + # Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model. ds = df \ .select("body") \ diff --git a/docs/structured-streaming-eventhubs-integration.md b/docs/structured-streaming-eventhubs-integration.md index 3856a1023..c423a9946 100644 --- a/docs/structured-streaming-eventhubs-integration.md +++ b/docs/structured-streaming-eventhubs-integration.md @@ -205,6 +205,14 @@ val connectionString = ConnectionStringBuilder("YOUR.EVENTHUB.COMPATIBLE.ENDPOIN val connectionString = "Valid EventHubs connection string." val ehConf = EventHubsConf(connectionString) +// build a spark Session +val spark = + SparkSession + .builder + .appName("Event Hub Spark App") + .config("spark.master", "local") + .getOrCreate() + val df = spark .readStream .format("eventhubs") @@ -239,6 +247,14 @@ val df = spark ### Creating an Event Hubs Source for Batch Queries ```scala +// build a spark Session +val spark = + SparkSession + .builder + .appName("Event Hub Spark App") + .config("spark.master", "local") + .getOrCreate() + // Simple batch query val df = spark .read @@ -315,6 +331,25 @@ Users can also provided properties via a `map[string, string]` if they would lik ### Creating an EventHubs Sink for Streaming Queries ```scala +// build a spark Session +val spark = + SparkSession + .builder + .appName("Event Hub Spark App") + .config("spark.master", "local") + .getOrCreate() + +// Create sample data +val columns = Seq("body", "num") +var data = Seq(("hello", "10"), ("azure", "10"), ("data", "10")) + +for (a <- 0 to 50000) { + data = data :+ ("hello", "2") +} + +val rdd = spark.sparkContext.parallelize(data) +val df = spark.createDataFrame(rdd).toDF(columns: _*) + // Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model. val ds = df .select("body") @@ -335,6 +370,25 @@ val ds = df ### Writing the output of Batch Queries to EventHubs ```scala +// build a spark Session +val spark = + SparkSession + .builder + .appName("Event Hub Spark App") + .config("spark.master", "local") + .getOrCreate() + +// Create sample data +val columns = Seq("body", "num") +var data = Seq(("hello", "10"), ("azure", "10"), ("data", "10")) + +for (a <- 0 to 50000) { + data = data :+ ("hello", "2") +} + +val rdd = spark.sparkContext.parallelize(data) +val df = spark.createDataFrame(rdd).toDF(columns: _*) + // Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model. df.select("body") .write