Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update documentation - add spark session and simple data #684

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/PySpark/structured-streaming-pyspark.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ ehConf = {
'eventhubs.connectionString' : connectionString
}

# build a spark session
spark = SparkSession.builder \
.appName("EventHubWriter") \
.getOrCreate()

df = spark \
.readStream \
.format("eventhubs") \
Expand All @@ -253,6 +258,11 @@ ehConf = {
'eventhubs.connectionString' : connectionString
}

# build a spark session
spark = SparkSession.builder \
.appName("EventHubWriter") \
.getOrCreate()

# Simple batch query
df = spark \
.read \
Expand Down Expand Up @@ -310,6 +320,21 @@ ehWriteConf = {
'eventhubs.connectionString' : writeConnectionString
}

# build a spark session
spark = SparkSession.builder \
.appName("EventHubWriter") \
.getOrCreate()

# create sample data
columns = ["body", "num"]
data = [("hello", "10"), ("azure", "10"), ("data", "10")]

for a in range(50001):
data.append(("hello", "2"))

rdd = spark.sparkContext.parallelize(data)
df = spark.createDataFrame(rdd, columns)

# Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model.
ds = df \
.select("body") \
Expand Down Expand Up @@ -338,6 +363,21 @@ ehWriteConf = {
'eventhubs.connectionString' : writeConnectionString
}

# build a spark session
spark = SparkSession.builder \
.appName("EventHubWriter") \
.getOrCreate()

# create sample data
columns = ["body", "num"]
data = [("hello", "10"), ("azure", "10"), ("data", "10")]

for a in range(50001):
data.append(("hello", "2"))

rdd = spark.sparkContext.parallelize(data)
df = spark.createDataFrame(rdd, columns)

# Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model.
ds = df \
.select("body") \
Expand Down
54 changes: 54 additions & 0 deletions docs/structured-streaming-eventhubs-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ val connectionString = ConnectionStringBuilder("YOUR.EVENTHUB.COMPATIBLE.ENDPOIN
val connectionString = "Valid EventHubs connection string."
val ehConf = EventHubsConf(connectionString)

// build a spark Session
val spark =
SparkSession
.builder
.appName("Event Hub Spark App")
.config("spark.master", "local")
.getOrCreate()

val df = spark
.readStream
.format("eventhubs")
Expand Down Expand Up @@ -239,6 +247,14 @@ val df = spark
### Creating an Event Hubs Source for Batch Queries

```scala
// build a spark Session
val spark =
SparkSession
.builder
.appName("Event Hub Spark App")
.config("spark.master", "local")
.getOrCreate()

// Simple batch query
val df = spark
.read
Expand Down Expand Up @@ -315,6 +331,25 @@ Users can also provided properties via a `map[string, string]` if they would lik
### Creating an EventHubs Sink for Streaming Queries

```scala
// build a spark Session
val spark =
SparkSession
.builder
.appName("Event Hub Spark App")
.config("spark.master", "local")
.getOrCreate()

// Create sample data
val columns = Seq("body", "num")
var data = Seq(("hello", "10"), ("azure", "10"), ("data", "10"))

for (a <- 0 to 50000) {
data = data :+ ("hello", "2")
}

val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd).toDF(columns: _*)

// Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model.
val ds = df
.select("body")
Expand All @@ -335,6 +370,25 @@ val ds = df
### Writing the output of Batch Queries to EventHubs

```scala
// build a spark Session
val spark =
SparkSession
.builder
.appName("Event Hub Spark App")
.config("spark.master", "local")
.getOrCreate()

// Create sample data
val columns = Seq("body", "num")
var data = Seq(("hello", "10"), ("azure", "10"), ("data", "10"))

for (a <- 0 to 50000) {
data = data :+ ("hello", "2")
}

val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd).toDF(columns: _*)

// Write body data from a DataFrame to EventHubs. Events are distributed across partitions using round-robin model.
df.select("body")
.write
Expand Down