Skip to content

Commit

Permalink
add python models session submission method
Browse files Browse the repository at this point in the history
  • Loading branch information
dkruh36 committed Jul 17, 2024
1 parent 2a31d3f commit 2e405cd
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
2 changes: 2 additions & 0 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from dbt.adapters.spark.python_submissions import (
JobClusterPythonJobHelper,
AllPurposeClusterPythonJobHelper,
SessionHelper
)
from dbt.adapters.base import BaseRelation
from dbt.adapters.contracts.relation import RelationType, RelationConfig
Expand Down Expand Up @@ -493,6 +494,7 @@ def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
return {
"job_cluster": JobClusterPythonJobHelper,
"all_purpose_cluster": AllPurposeClusterPythonJobHelper,
"session": SessionHelper,
}

def standardize_grants_dict(self, grants_table: "agate.Table") -> dict:
Expand Down
13 changes: 13 additions & 0 deletions dbt/adapters/spark/python_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,16 @@ def submit(self, compiled_code: str) -> None:
)
finally:
context.destroy(context_id)


class SessionHelper(PythonJobHelper):
def __init__(self, parsed_model: Dict, credentials: SparkCredentials) -> None:
pass

def submit(self, compiled_code: str) -> Any:
try:
from pyspark.sql import SparkSession
spark = SparkSession.getActiveSession()
exec(compiled_code,{"spark": spark})
except Exception as e:
raise DbtRuntimeError(f"Python model failed with traceback as:\n{e}")

0 comments on commit 2e405cd

Please sign in to comment.