Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

poll .GetBatch() instead of using operation.result() #929

Merged
merged 19 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230721-101041.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Serverless Spark to Poll with .GetBatch() instead of using operation.result()
time: 2023-07-21T10:10:41.64843-07:00
custom:
Author: wazi55
Issue: "734"
40 changes: 28 additions & 12 deletions dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Dict, Union

from dbt.adapters.base import PythonJobHelper
Expand All @@ -9,6 +10,8 @@
from google.api_core.client_options import ClientOptions
from google.cloud import storage, dataproc_v1 # type: ignore
from google.protobuf.json_format import ParseDict
from google.cloud.dataproc_v1.types.batches import Batch
import time

OPERATION_RETRY_TIME = 10

Expand Down Expand Up @@ -102,8 +105,8 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
"job": job,
}
)
response = operation.result(polling=self.result_polling_policy)
# check if job failed
response = operation.result(polling=self.result_polling_policy)
if response.status.state == 6:
raise ValueError(response.status.details)
return response
Expand All @@ -118,20 +121,33 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient:
def _get_batch_id(self) -> str:
return self.parsed_model["config"].get("batch_id")

def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
def _submit_dataproc_job(self) -> Batch:
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
batch = self._configure_batch()
parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}"

request = dataproc_v1.CreateBatchRequest(
parent=parent,
batch=batch,
batch_id=self._get_batch_id(),
)
batch_id = self._get_batch_id()
request = dataproc_v1.CreateBatchRequest(parent=parent, batch=batch, batch_id=batch_id) # type: ignore
# make the request
operation = self.job_client.create_batch(request=request) # type: ignore
# this takes quite a while, waiting on GCP response to resolve
# (not a google-api-core issue, more likely a dataproc serverless issue)
response = operation.result(polling=self.result_polling_policy)
self.job_client.create_batch(request=request) # type: ignore
# using the creat_batch `.result()` method takes quite a while as it waits for all
# resources to tear down before returning, so we do the polling ourselves. This is a bit hacky but it works.
state = Batch.State.PENDING
response = None
run_time = 0
while state in [Batch.State.PENDING, Batch.State.RUNNING] and run_time < self.timeout:
time.sleep(1)
response = self.job_client.get_batch( # type: ignore
request=dataproc_v1.GetBatchRequest(name="".join([parent, "/batches/", batch_id])), # type: ignore
)
run_time = datetime.now().timestamp() - response.create_time.timestamp() # type: ignore
state = response.state
if not response:
raise ValueError("No response from Dataproc")
if run_time >= self.timeout:
raise ValueError(
f"Operation did not complete within the designated timeout of {self.timeout} seconds."
)
if state != Batch.State.SUCCEEDED:
raise ValueError(response.state_message)
return response
# there might be useful results here that we can parse and return
# Dataproc job output is saved to the Cloud Storage bucket
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def model(dbt, spark):
"""

models__python_array_batch_id_python = """
import pandas
import pandas as pd
mikealfare marked this conversation as resolved.
Show resolved Hide resolved

def model(dbt, spark):
random_array = [
Expand Down