Dataflow Template makes it easy to stream data from Kafka to BigQuery. This is a step by step guide of how to create a dataflow pipeline to stream data from Kafka to BigQuery with customized UDF to transform data.
The demo is delivered in Linux Shell scripts. You can start a Cloud Shell sesstion to run the scripts.
- VPC including networks, firewall rules, NAT are setup
- Google APIs (Dataflow, Bigquery, Cloud Storage) are already enabled
Change these environment variables according to your environment.
export PROJECT=forrest-datastream
export REGION=us-central1
export ZONE=us-central1-f
export NETWORK=bigdata-network
export SUBNET=us-central1-subnet
export GSA_NAME=dataflow-sa
export GSA_FULL="${GSA_NAME}@${PROJECT}.iam.gserviceaccount.com"
export GCS_DATAFLOW=gs://forrest-bigdata-ds-bucket/dataflow
Create a service account for dataflow worker with GCS and BQ permission
gcloud projects add-iam-policy-binding "${PROJECT}" \
--project=${PROJECT} \
--role=roles/bigquery.admin \
--member="serviceAccount:${GSA_FULL}"
gcloud projects add-iam-policy-binding "${PROJECT}" \
--project=${PROJECT} \
--role=roles/dataflow.worker \
--member="serviceAccount:${GSA_FULL}"
gcloud projects add-iam-policy-binding "${PROJECT}" \
--project=${PROJECT} \
--role=roles/storage.objectAdmin \
--member="serviceAccount:${GSA_FULL}"
The following script will create a VM, upload js udf to GCS and run kafka in docker:
gcloud compute instances create kafka-vm \
--project=${PROJECT} \
--zone=${ZONE} \
--network=${NETWORK} \
--subnet=${SUBNET} \
--machine-type=e2-small \
--boot-disk-size=20GB \
--scopes=cloud-platform \
--service-account=${GSA_FULL} \
--no-address \
--shielded-secure-boot \
--metadata=startup-script='#! /bin/bash
apt update
apt install git docker-compose -y
git clone https://github.com/gddezero/gcp-samples.git
cd gcp-samples/kafka_to_bigquery
gsutil cp simple_udf.js $GCS_DATAFLOW/scripts/
export IP=$(ip -o route get to 8.8.8.8 | sed -n "s/.*src \([0-9.]\+\).*/\1/p")
echo ${IP}
docker-compose up -d
docker run -v /gcp-samples/kafka_to_bigquery:/script -w /script -it --network=host --rm bitnami/kafka:2.3.1 bash /script/gen_order.sh
'
Now a VM is deployed with Kafka broker running. The background running script will generate one message per second. Here is the sample message:
{
"e": "depthUpdate",
"t": 1676525078,
"s": "BTCUSDT",
"i": 162,
"l": 160,
"b":
[
"0.0024",
"10"
],
"a":
[
"0.0026",
"100"
]
}
We use a JavaScript UDF to transform each message:
- Transform the key to human readable
- Transform the Array to structure
- Add a new key called dt, which is derived from the key "t", to serve as partition key in BigQuery
Run the following command to get IP of Kafka VM, which will be used in dataflow pipeline
export KAFKA_IP=$(gcloud compute instances describe kafka-vm \
--project=${PROJECT} \
--zone=${ZONE} \
--format json \
| jq .networkInterfaces[0].networkIP -r)
Create BigQuery dataset
bq --location=US mk -d ${PROJECT}:crypto
Create BigQuery table
bq query --use_legacy_sql=false --project_id=${PROJECT} \
"create table ${PROJECT}.crypto.orderbook (
event_name string,
event_time int,
symbol string,
first_update_id int,
final_update_id int,
bids struct<price decimal(10,4), quantity int>,
asks struct<price decimal(10,4), quantity int>,
dt date
)
partition by dt"
JOB_ID=kafka-to-bq-$(date +%s)
gcloud dataflow flex-template run ${JOB_ID} \
--template-file-gcs-location="gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery" \
--parameters=^#^bootstrapServers=$KAFKA_IP:9093#outputTableSpec=${PROJECT}:crypto.orderbook#inputTopics=orderbook,orderbook_okx#javascriptTextTransformGcsPath=${GCS_DATAFLOW}/scripts/simple_udf.js#javascriptTextTransformFunctionName=transform#useStorageWriteApi=true#numStorageWriteApiStreams=1#storageWriteApiTriggeringFrequencySec=5#stagingLocation=${GCS_DATAFLOW}/staging#maxNumWorkers=10#numberOfWorkerHarnessThreads=1#saveHeapDumpsToGcsPath=${GCS_DATAFLOW}/dump#serviceAccount=${GSA_FULL}#workerMachineType=n1-standard-1#usePublicIps=false \
--region=${REGION} \
--project=${PROJECT} \
--temp-location=${GCS_DATAFLOW}/temp \
--network=${NETWORK} \
--subnetwork=regions/${REGION}/subnetworks/${SUBNET} \
--enable-streaming-engine
bq query --use_legacy_sql=false --project_id=${PROJECT} \
"select * from ${PROJECT}.crypto.orderbook order by event_time desc limit 20"
event_name | event_time | symbol | first_update_id | final_update_id | bids | asks | dt |
---|---|---|---|---|---|---|---|
depthUpdate | 1678194676 | BTCUSDT | 335 | 333 | {"price":"4.7201","quantity":"27"} | {"price":"7.0086","quantity":"16"} | 2023-03-07 |
depthUpdate | 1678194675 | BTCUSDT | 334 | 332 | {"price":"9.0755","quantity":"55"} | {"price":"0.2344","quantity":"67"} | 2023-03-07 |
depthUpdate | 1678194674 | BTCUSDT | 333 | 331 | {"price":"0.186","quantity":"66"} | {"price":"9.9133","quantity":"71"} | 2023-03-07 |
depthUpdate | 1678194673 | BTCUSDT | 332 | 330 | {"price":"0.3535","quantity":"76"} | {"price":"7.874","quantity":"6"} | 2023-03-07 |
depthUpdate | 1678194672 | BTCUSDT | 331 | 329 | {"price":"9.0123","quantity":"9"} | {"price":"7.9256","quantity":"50"} | 2023-03-07 |
depthUpdate | 1678194671 | BTCUSDT | 330 | 328 | {"price":"1.9524","quantity":"91"} | {"price":"7.9874","quantity":"92"} | 2023-03-07 |
Stop dataflow pipeline
gcloud dataflow jobs cancel ${JOB_ID}
Terminate kafka VM
gcloud compute instances delete kafka-vm \
--project=${PROJECT} \
--zone=${ZONE}
Delete BigQuery dataset and table
bq rm -r -f -d ${PROJECT}:crypto