-
-
-
-
-
-```python
-import snowflake.snowpark.functions as F
-
-def model(dbt, session):
- dbt.config(
- materialized = "incremental",
- unique_key = "id",
- )
- df = dbt.ref("upstream_table")
-
- if dbt.is_incremental:
-
- # only new rows compared to max in current table
- max_from_this = f"select max(updated_at) from {dbt.this}"
- df = df.filter(df.updated_at > session.sql(max_from_this).collect()[0][0])
-
- # or only rows from the past 3 days
- df = df.filter(df.updated_at >= F.dateadd("day", F.lit(-3), F.current_timestamp()))
-
- ...
-
- return df
-```
-
-
-
-
-
-
-
-
-
-```python
-import pyspark.sql.functions as F
-
-def model(dbt, session):
- dbt.config(
- materialized = "incremental",
- unique_key = "id",
- )
- df = dbt.ref("upstream_table")
-
- if dbt.is_incremental:
-
- # only new rows compared to max in current table
- max_from_this = f"select max(updated_at) from {dbt.this}"
- df = df.filter(df.updated_at > session.sql(max_from_this).collect()[0][0])
-
- # or only rows from the past 3 days
- df = df.filter(df.updated_at >= F.date_add(F.current_timestamp(), F.lit(-3)))
-
- ...
-
- return df
-```
-
-
-
-
-
-
-
-**Note:** Incremental models are supported on BigQuery/Dataproc for the `merge` incremental strategy. The `insert_overwrite` strategy is not yet supported.
-
-## Python-specific functionality
-
-### Defining functions
-
-In addition to defining a `model` function, the Python model can import other functions or define its own. Here's an example, on Snowpark, defining a custom `add_one` function:
-
-
-
-
-
-
-
-```python
-import holidays
-
-def is_holiday(date_col):
- # Chez Jaffle
- french_holidays = holidays.France()
- is_holiday = (date_col in french_holidays)
- return is_holiday
-
-def model(dbt, session):
- dbt.config(
- materialized = "table",
- packages = ["holidays"]
- )
-
- orders_df = dbt.ref("stg_orders")
-
- df = orders_df.to_pandas()
-
- # apply our function
- # (columns need to be in uppercase on Snowpark)
- df["IS_HOLIDAY"] = df["ORDER_DATE"].apply(is_holiday)
-
- # return final dataset (Pandas DataFrame)
- return df
-```
-
-
-
-
-
-
-
-
-
-```python
-import holidays
-
-def is_holiday(date_col):
- # Chez Jaffle
- french_holidays = holidays.France()
- is_holiday = (date_col in french_holidays)
- return is_holiday
-
-def model(dbt, session):
- dbt.config(
- materialized = "table",
- packages = ["holidays"]
- )
-
- orders_df = dbt.ref("stg_orders")
-
- df = orders_df.to_pandas_on_spark() # Spark 3.2+
- # df = orders_df.toPandas() in earlier versions
-
- # apply our function
- df["is_holiday"] = df["order_date"].apply(is_holiday)
-
- # convert back to PySpark
- df = df.to_spark() # Spark 3.2+
- # df = session.createDataFrame(df) in earlier versions
-
- # return final dataset (PySpark DataFrame)
- return df
-```
-
-
-
-
-
-
-
-#### Configuring packages
-
-We encourage you to explicitly configure required packages and versions so dbt can track them in project metadata. This configuration is required for the implementation on some platforms. If you need specific versions of packages, specify them.
-
-
-
-
-
-
-
-```python
-import snowflake.snowpark.types as T
-import snowflake.snowpark.functions as F
-import numpy
-
-def register_udf_add_random():
- add_random = F.udf(
- # use 'lambda' syntax, for simple functional behavior
- lambda x: x + numpy.random.normal(),
- return_type=T.FloatType(),
- input_types=[T.FloatType()]
- )
- return add_random
-
-def model(dbt, session):
-
- dbt.config(
- materialized = "table",
- packages = ["numpy"]
- )
-
- temps_df = dbt.ref("temperatures")
-
- add_random = register_udf_add_random()
-
- # warm things up, who knows by how much
- df = temps_df.withColumn("degree_plus_random", add_random("degree"))
- return df
-```
-
-
-
-**Note:** Due to a Snowpark limitation, it is not currently possible to register complex named UDFs within stored procedures, and therefore dbt Python models. We are looking to add native support for Python UDFs as a project/DAG resource type in a future release. For the time being, if you want to create a "vectorized" Python UDF via the Batch API, we recommend either:
-- Writing [`create function`](https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-batch.html) inside a SQL macro, to run as a hook or run-operation
-- [Registering from a staged file](https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/_autosummary/snowflake.snowpark.udf.html#snowflake.snowpark.udf.UDFRegistration.register_from_file) within your Python model code
-
-
-
-
-
-
-
-```python
-from pyspark.sql.types as T
-import pyspark.sql.functions as F
-import numpy
-
-# use a 'decorator' for more readable code
-@F.udf(returnType=T.DoubleType())
-def add_random(x):
- random_number = numpy.random.normal()
- return x + random_number
-
-def model(dbt, session):
- dbt.config(
- materialized = "table",
- packages = ["numpy"]
- )
-
- temps_df = dbt.ref("temperatures")
-
- # warm things up, who knows by how much
- df = temps_df.withColumn("degree_plus_random", add_random("degree"))
- return df
-```
-
-
-
-
-
-
-
-#### Code reuse
-
-Currently, you cannot import or reuse Python functions defined in one dbt model, in other models. This is something we'd like dbt to support. There are two patterns we're considering:
-1. Creating and registering **"named" UDFs**. This process is different across data platforms and has some performance limitations. (Snowpark does support ["vectorized" UDFs](https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-batch.html): pandas-like functions that you can execute in parallel.)
-2. Using **private Python packages**. In addition to importing reusable functions from public PyPI packages, many data platforms support uploading custom Python assets and registering them as packages. The upload process looks different across platforms, but your code’s actual `import` looks the same.
-
-:::note ❓ Our questions
-
-- Should dbt have a role in abstracting over UDFs? Should dbt support a new type of DAG node, `function`? Would the primary use case be code reuse across Python models or defining Python-language functions that can be called from SQL models?
-- How can dbt help users when uploading or initializing private Python assets? Is this a new form of `dbt deps`?
-- How can dbt support users who want to test custom functions? If defined as UDFs: "unit testing" in the database? If "pure" functions in packages: encourage adoption of `pytest`?
-
-💬 Discussion: ["Python models: package, artifact/object storage, and UDF management in dbt"](https://github.com/dbt-labs/dbt-core/discussions/5741)
-:::
-
-### DataFrame API and syntax
-
-Over the past decade, most people writing data transformations in Python have adopted
-
-
-
-**Additional setup:** You will need to [acknowledge and accept Snowflake Third Party Terms](https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-packages.html#getting-started) to use Anaconda packages.
-
-**Installing packages:** Snowpark supports several popular packages via Anaconda. The complete list is at https://repo.anaconda.com/pkgs/snowflake/. Packages are installed at the time your model is being run. Different models can have different package dependencies. If you are using third-party packages, Snowflake recommends using a dedicated virtual warehouse for best performance rather than one with many concurrent users.
-
-**About "sprocs":** dbt submits Python models to run as "stored procedures," which some people call "sprocs" for short. By default, dbt will create a named sproc containing your model's compiled Python code, and then "call" it to execute. Snowpark has a Private Preview feature for "temporary" or "anonymous" stored procedures ([docs](https://docs.snowflake.com/en/LIMITEDACCESS/call-with.html)), which are faster and leave a cleaner query history. If this feature is enabled for your account, you can switch it on for your models by configuring `use_anonymous_sproc: True`. We plan to switch this on for all dbt + Snowpark Python models in a future release.
-
-
-
-```yml
-# I asked Snowflake Support to enable this Private Preview feature,
-# and now my dbt-py models run even faster!
-models:
- use_anonymous_sproc: True
-```
-
-
-
-**Docs:** ["Developer Guide: Snowpark Python"](https://docs.snowflake.com/en/developer-guide/snowpark/python/index.html)
-
-
-
-
-
-**Submission methods:** Databricks supports a few different mechanisms to submit PySpark code, each with relative advantages. Some are better for supporting iterative development, while others are better for supporting lower-cost production deployments. The options are:
-- `all_purpose_cluster` (default): dbt will run your Python model using the cluster ID configured as `cluster` in your connection profile or for this specific model. These clusters are more expensive but also much more responsive. We recommend using an interactive all-purpose cluster for quicker iteration in development.
- - `create_notebook: True`: dbt will upload your model's compiled PySpark code to a notebook in the namespace `/Shared/dbt_python_model/{schema}`, where `{schema}` is the configured schema for the model, and execute that notebook to run using the all-purpose cluster. The appeal of this approach is that you can easily open the notebook in the Databricks UI for debugging or fine-tuning right after running your model. Remember to copy any changes into your dbt `.py` model code before re-running.
- - `create_notebook: False` (default): dbt will use the [Command API](https://docs.databricks.com/dev-tools/api/1.2/index.html#run-a-command), which is slightly faster.
-- `job_cluster`: dbt will upload your model's compiled PySpark code to a notebook in the namespace `/Shared/dbt_python_model/{schema}`, where `{schema}` is the configured schema for the model, and execute that notebook to run using a short-lived jobs cluster. For each Python model, Databricks will need to spin up the cluster, execute the model's PySpark transformation, and then spin down the cluster. As such, job clusters take longer before and after model execution, but they're also less expensive, so we recommend these for longer-running Python models in production. To use the `job_cluster` submission method, your model must be configured with `job_cluster_config`, which defines key-value properties for `new_cluster`, as defined in the [JobRunsSubmit API](https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit).
-
-You can configure each model's `submission_method` in all the standard ways you supply configuration:
-
-```python
-def model(dbt, session):
- dbt.config(
- submission_method="all_purpose_cluster",
- create_notebook=True,
- cluster_id="abcd-1234-wxyz"
- )
- ...
-```
-```yml
-version: 2
-models:
- - name: my_python_model
- config:
- submission_method: job_cluster
- job_cluster_config:
- spark_version: ...
- node_type_id: ...
-```
-```yml
-# dbt_project.yml
-models:
- project_name:
- subfolder:
- # set defaults for all .py models defined in this subfolder
- +submission_method: all_purpose_cluster
- +create_notebook: False
- +cluster_id: abcd-1234-wxyz
-```
-
-If not configured, `dbt-spark` will use the built-in defaults: the all-purpose cluster (based on `cluster` in your connection profile) without creating a notebook. The `dbt-databricks` adapter will default to the cluster configured in `http_path`. We encourage explicitly configuring the clusters for Python models in Databricks projects.
-
-**Installing packages:** When using all-purpose clusters, we recommend installing packages which you will be using to run your Python models.
-
-**Docs:**
-- [PySpark DataFrame syntax](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)
-- [Databricks: Introduction to DataFrames - Python](https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html)
-
-
-
-
-
-The `dbt-bigquery` adapter uses a service called Dataproc to submit your Python models as PySpark jobs. That Python/PySpark code will read from your tables and views in BigQuery, perform all computation in Dataproc, and write the final result back to BigQuery.
-
-**Submission methods.** Dataproc supports two submission methods: `serverless` and `cluster`. Dataproc Serverless does not require a ready cluster, which saves on hassle and cost—but it is slower to start up, and much more limited in terms of available configuration. For example, Dataproc Serverless supports only a small set of Python packages, though it does include `pandas`, `numpy`, and `scikit-learn`. (See the full list [here](https://cloud.google.com/dataproc-serverless/docs/guides/custom-containers#example_custom_container_image_build), under "The following packages are installed in the default image"). Whereas, by creating a Dataproc Cluster in advance, you can fine-tune the cluster's configuration, install any PyPI packages you want, and benefit from faster, more responsive runtimes.
-
-Use the `cluster` submission method with dedicated Dataproc clusters you or your organization manage. Use the `serverless` submission method to avoid managing a Spark cluster. The latter may be quicker for getting started, but both are valid for production.
-
-**Additional setup:**
-- Create or use an existing [Cloud Storage bucket](https://cloud.google.com/storage/docs/creating-buckets)
-- Enable Dataproc APIs for your project + region
-- If using the `cluster` submission method: Create or use an existing [Dataproc cluster](https://cloud.google.com/dataproc/docs/guides/create-cluster) with the [Spark BigQuery connector initialization action](https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/connectors#bigquery-connectors). (Google recommends copying the action into your own Cloud Storage bucket, rather than using the example version shown in the screenshot below.)
-
-
-
-The following configurations are needed to run Python models on Dataproc. You can add these to your [BigQuery profile](/reference/warehouse-setups/bigquery-setup#running-python-models-on-dataproc), or configure them on specific Python models:
-- `gcs_bucket`: Storage bucket to which dbt will upload your model's compiled PySpark code.
-- `dataproc_region`: GCP region in which you have enabled Dataproc (for example `us-central1`)
-- `dataproc_cluster_name`: Name of Dataproc cluster to use for running Python model (executing PySpark job). Only required if `submission_method: cluster`.
-
-```python
-def model(dbt, session):
- dbt.config(
- submission_method="cluster",
- dataproc_cluster_name="my-favorite-cluster"
- )
- ...
-```
-```yml
-version: 2
-models:
- - name: my_python_model
- config:
- submission_method: serverless
-```
-
-Any user or service account that runs dbt Python models will need the following permissions, in addition to permissions needed for BigQuery ([docs](https://cloud.google.com/dataproc/docs/concepts/iam/iam)):
-```
-dataproc.clusters.use
-dataproc.jobs.create
-dataproc.jobs.get
-dataproc.operations.get
-storage.buckets.get
-storage.objects.create
-storage.objects.delete
-```
-
-**Installing packages:** If you are using a Dataproc Cluster (as opposed to Dataproc Serverless), you can add third-party packages while creating the cluster.
-
-Google recommends installing Python packages on Dataproc clusters via initialization actions:
-- [How initialization actions are used](https://github.com/GoogleCloudDataproc/initialization-actions/blob/master/README.md#how-initialization-actions-are-used)
-- [Actions for installing via `pip` or `conda`](https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/python)
-
-You can also install packages at cluster creation time by [defining cluster properties](https://cloud.google.com/dataproc/docs/tutorials/python-configuration#image_version_20): `dataproc:pip.packages` or `dataproc:conda.packages`.
-
-
-
-**Docs:**
-- [Dataproc overview](https://cloud.google.com/dataproc/docs/concepts/overview)
-- [PySpark DataFrame syntax](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)
-
-
-
-
-
-