forked from GoogleCloudPlatform/professional-services
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ephemeral_dataproc_spark_dag.py
162 lines (138 loc) · 7.46 KB
/
ephemeral_dataproc_spark_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Copyright 2018 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator, \
DataProcPySparkOperator, DataprocClusterDeleteOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.operators import BashOperator, PythonOperator
from airflow.models import Variable
from airflow.utils.trigger_rule import TriggerRule
##################################################################
# This file defines the DAG for the logic pictured below. #
##################################################################
# #
# create_cluster #
# | #
# V #
# submit_pyspark....... #
# | . #
# / \ V #
# / \ move_failed_files #
# / \ ^ #
# | | . #
# V V . #
# delete_cluster bq_load..... #
# | #
# V #
# delete_transformed_files #
# #
# (Note: Dotted lines indicate conditional trigger rule on #
# failure of the up stream tasks. In this case the files in the #
# raw-{timestamp}/ GCS path will be moved to a failed-{timestamp}#
# path.) #
##################################################################
# These are stored as a Variables in our Airflow Environment.
BUCKET = Variable.get('gcs_bucket') # GCS bucket with our data.
OUTPUT_TABLE = Variable.get('bq_output_table') # BigQuery table to which results will be written
# Path to python script that does data manipulation
PYSPARK_JOB = 'gs://' + BUCKET + '/spark-jobs/spark_avg_speed.py'
# Airflow parameters, see https://airflow.incubator.apache.org/code.html
DEFAULT_DAG_ARGS = {
'owner': 'airflow', # The owner of the task.
# Task instance should not rely on the previous task's schedule to succeed.
'depends_on_past': False,
# We use this in combination with schedule_interval=None to only trigger the DAG with a
# POST to the REST API.
# Alternatively, we could set this to yesterday and the dag will be triggered upon upload to the
# dag folder.
'start_date': datetime.utcnow(),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1, # Retry once before failing the task.
'retry_delay': timedelta(minutes=5), # Time between retries.
'project_id': Variable.get('gcp_project'), # Cloud Composer project ID.
# We only want the DAG to run when we POST to the api.
# Alternatively, this could be set to '@daily' to run the job once a day.
# more options at https://airflow.apache.org/scheduler.html#dag-runs
'schedule_interval': None
}
# Create Directed Acyclic Graph for Airflow
with DAG('average-speed',
default_args=DEFAULT_DAG_ARGS) as dag: # Here we are using dag as context.
# Create the Cloud Dataproc cluster.
# Note: this operator will be flagged a success if the cluster by this name already exists.
create_cluster = DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# ds_nodash is an airflow macro for "[Execution] Date string no dashes"
# in YYYYMMDD format. See docs https://airflow.apache.org/code.html?highlight=macros#macros
cluster_name='ephemeral-spark-cluster-{{ ds_nodash }}',
num_workers=2,
num_preemptible_workers=2,
zone=Variable.get('gce_zone')
)
# Submit the PySpark job.
submit_pyspark = DataProcPySparkOperator(
task_id='run_dataproc_pyspark',
main=PYSPARK_JOB,
# Obviously needs to match the name of cluster created in the prior Operator.
cluster_name='ephemeral-spark-cluster-{{ ds_nodash }}',
# Let's template our arguments for the pyspark job from the POST payload.
arguments=["--gcs_path_raw={{ dag_run.conf['raw_path'] }}",
"--gcs_path_transformed=gs://" + BUCKET +
"/{{ dag_run.conf['transformed_path'] }}"]
)
# Load the transformed files to a BigQuery table.
bq_load = GoogleCloudStorageToBigQueryOperator(
task_id='GCS_to_BigQuery',
bucket=BUCKET,
# Wildcard for objects created by spark job to be written to BigQuery
# Reads the relative path to the objects transformed by the spark job from the POST message.
source_objects=["{{ dag_run.conf['transformed_path'] }}/part-*"],
destination_project_dataset_table=OUTPUT_TABLE,
schema_fields=None,
schema_object='schemas/nyc-tlc-yellow.json', # Relative gcs path to schema file.
source_format='CSV', # Note that our spark job does json -> csv conversion.
create_disposition='CREATE_IF_NEEDED',
skip_leading_rows=0,
write_disposition='WRITE_TRUNCATE', # If the table exists, overwrite it.
max_bad_records=0
)
# Delete the Cloud Dataproc cluster.
delete_cluster = DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
# Obviously needs to match the name of cluster created in the prior two Operators.
cluster_name='ephemeral-spark-cluster-{{ ds_nodash }}',
# This will tear down the cluster even if there are failures in upstream tasks.
trigger_rule=TriggerRule.ALL_DONE
)
# Delete gcs files in the timestamped transformed folder.
delete_transformed_files = BashOperator(
task_id='delete_transformed_files',
bash_command="gsutil -m rm -r gs://" + BUCKET + "/{{ dag_run.conf['transformed_path'] }}/"
)
# If the spark job or BQ Load fails we rename the timestamped raw path to
# a timestamped failed path.
move_failed_files = BashOperator(
task_id='move_failed_files',
bash_command="gsutil mv gs://" + BUCKET + "/{{ dag_run.conf['raw_path'] }}/ "
+ "gs://" + BUCKET + "/{{ dag_run.conf['failed_path'] }}/",
trigger_rule=TriggerRule.ONE_FAILED
)
# Set the dag property of the first Operators, this will be inherited by downstream Operators.
create_cluster.dag = dag
create_cluster.set_downstream(submit_pyspark)
submit_pyspark.set_downstream([delete_cluster, bq_load])
bq_load.set_downstream(delete_transformed_files)
move_failed_files.set_upstream([bq_load, submit_pyspark])