Make sure to use VS Code Remote Development Extension Pack and Setup Development Environment using Dataproc.
- We have already gone through the details in previous section.
Let us validate Google BigQuery Integration with Python.
from google.cloud import bigquery
client = bigquery.Client()
QUERY = (
'SELECT * FROM `tidy-fort-361710.retail.orders` '
'LIMIT 10'
)
query_job = client.query(QUERY)
rows = query_job.result()
for row in rows:
print(row.order_id)
Let us go ahead and validate Google BigQuery Integration with Pyspark. We need to make sure Pyspark is launched with appropriate jars of Google BigQuery Spark Connector.
- Here is the command used based on the version of Scala using which Spark is developed.
pyspark --jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar
- Here is sample Spark Code to read from BigQuery Table.
project_id = 'tidy-fort-361710'
df = spark. \
read. \
format('bigquery'). \
option('table', f'{project_id}:retail.orders'). \
load()
df.printSchema()
df.orderBy('order_id').show()
Let us review the logic to compute daily product revenue. We have already deployed workflow using Spark SQL Commands.
- Jobs to take care of file format converter for
orders
andorder_items
. - Job to compute daily product revenue and save the results to a folder in GCS.
- We now need to create Pyspark Job to save the daily product revenue to BigQuery Table.
Let us go ahead and create Table in Google BigQuery to preserve daily product revenue.
- We can use the
CREATE TABLE
command that is part of the script used to compute daily compute revenue as reference.
CREATE TABLE IF NOT EXISTS retail.daily_product_revenue (
order_date STRING,
order_item_product_id INTEGER,
order_status STRING,
product_quantity INTEGER,
product_revenue DECIMAL
);
Now it is time for us to develop the logic to write data in Dataframe to BigQuery Table.
- Create Dataframe using data in GCS files.
- Set GCS bucket as temporary bucket.
- Develop logic to write to Google BigQuery Table.
daily_product_revenue = spark. \
read. \
parquet('gs://airetail/retail_gold.db/daily_product_revenue')
spark.conf.set('temporaryGcsBucket', 'airetail')
project_id = 'tidy-fort-361710'
daily_product_revenue. \
write. \
mode('overwrite'). \
format('bigquery'). \
option('table', f'{project_id}:retail.daily_product_revenue'). \
save()
- Validate by Querying Google BigQuery Table.
Make sure to login and validate by querying table in Google BigQuery.
Let us go ahead and truncate the table in BigQuery, so that we can deploy the Spark Job to load the data into the table.
- We can use
TRUNCATE TABLE retail.daily_product_revenue
to truncate the table.
Let us go ahead and create program file with core logic to write data from parquet files into BigQuery Table using Spark. Once the program file is ready we can follow below instructions to validate the application using client mode.
- The application is already made available to you under
apps
folder. Make sure to review the application before running it. - Export all the relevant variables. Make sure to update values based on your environment.
export DATA_URI='gs://airetail/retail_gold.db/daily_product_revenue'
export PROJECT_ID='tidy-fort-361710'
export DATASET_NAME='retail'
export GCS_TEMP_BUCKET='airetail'
- Run
spark-submit
to submit the job.
spark-submit \
--master yarn \
--name "Daily Product Revenue Loader" \
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
app.py
Here are the instructions to submit the same Spark Job using cluster mode.
spark-submit \
--master yarn \
--deploy-mode cluster \
--name "Daily Product Revenue Loader" \
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--conf "spark.yarn.appMasterEnv.DATA_URI=gs://airetail/retail_gold.db/daily_product_revenue" \
--conf "spark.yarn.appMasterEnv.PROJECT_ID=tidy-fort-361710" \
--conf "spark.yarn.appMasterEnv.DATASET_NAME=retail" \
--conf "spark.yarn.appMasterEnv.GCS_TEMP_BUCKET=airetail" \
app.py
Make sure to copy the application to GCS so that we can create the job without any issue using Dataproc.
gsutil cp apps/daily_product_revenue_bq/app.py gs://airetail/apps/daily_product_revenue_bq/app.py
- One can validate whether the application can be run from GCS or not by submitting the job using
spark-submit
.
spark-submit \
--master yarn \
--deploy-mode cluster \
--name "Daily Product Revenue Loader" \
--jars gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--conf "spark.yarn.appMasterEnv.DATA_URI=gs://airetail/retail_gold.db/daily_product_revenue" \
--conf "spark.yarn.appMasterEnv.PROJECT_ID=tidy-fort-361710" \
--conf "spark.yarn.appMasterEnv.DATASET_NAME=retail" \
--conf "spark.yarn.appMasterEnv.GCS_TEMP_BUCKET=airetail" \
gs://airetail/apps/daily_product_revenue_bq/app.py
Here are the configurations related to Dataproc Spark Job.
- Job type: PySpark
- Main python file: gs://airetail/apps/daily_product_revenue_bq/app.py
- Jar files: gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar
- Properties
Key | Value |
---|---|
spark.yarn.appMasterEnv.DATA_URI | gs://airetail/retail_gold.db/daily_product_revenue |
spark.yarn.appMasterEnv.PROJECT_ID | tidy-fort-361710 |
spark.yarn.appMasterEnv.DATASET_NAME | retail |
spark.yarn.appMasterEnv.GCS_TEMP_BUCKET | airetail |
spark.submit.deployMode | cluster |
spark.app.name | Daily Product Revenue Loader |
Let us take care of submitting the application using gcloud dataproc
command from our local development environment (Mac or Windows PC).
gcloud dataproc jobs submit \
pyspark --cluster=aidataprocdev \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--properties=spark.app.name="BigQuery Loader - Daily Product Revenue" \
--properties=spark.submit.deployMode=cluster \
--properties=spark.yarn.appMasterEnv.DATA_URI=gs://airetail/retail_gold.db/daily_product_revenue \
--properties=spark.yarn.appMasterEnv.PROJECT_ID=tidy-fort-361710 \
--properties=spark.yarn.appMasterEnv.DATASET_NAME=retail \
--properties=spark.yarn.appMasterEnv.GCS_TEMP_BUCKET=airetail \
gs://airetail/apps/daily_product_revenue_bq/app.py
Let us make sure we create the Dataproc Workflow for the end to end pipeline including Pyspark Application to load daily product revenue data from Parquet Files to BigQuery Table. Here are the details related to end-to-end pipeline.
- Cleanup Spark Metastore Databases and Tables.
- Convert Files from JSON to Parquet for Orders and Order Items.
- Compute Daily Product Revenue and save the output to GCS using Parquet Files.
- Copy Data from GCS into BigQuery Table using Spark BigQuery Connector.
We will be using relevant gcloud dataproc
commands to create the workflow. Please refer to the Notebook which have all the relevant commands.
As Dataproc Workflow is created, let us instantiate and validate if it run without any issues.
- Make sure to truncate BigQuery Table before running the Workflow.
Let us understand how to fix issues by updating job properties in Dataproc Workflow Template.
- Make sure the issue is troubleshooted.
- Remove the job from Workflow Template.
- Add the job back to Workflow Template by fixing the properties relevant to job.
- Run the template and validate.
Now, the Workflow Template is ready to schedule. You can integrate Workflow Template with your enterprise scheduler either by using Shell scripts or commands, REST APIs as well as SDK. For example, we can use relevant Airflow Operator (Airflow Google Cloud SDK) to run Workflow Templates.