Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add time_zone configuration for source freshness checks #11066

Open
2 tasks done
Benjamin0313 opened this issue Nov 28, 2024 · 8 comments
Open
2 tasks done

[Feature] Add time_zone configuration for source freshness checks #11066

Benjamin0313 opened this issue Nov 28, 2024 · 8 comments
Labels
awaiting_response enhancement New feature or request freshness related to the dbt source freshness command performance

Comments

@Benjamin0313
Copy link

Benjamin0313 commented Nov 28, 2024

Is this a new bug in dbt-core?

  • I believe this is a new bug in dbt-core
  • I have searched the existing issues, and I could not find an existing issue for this bug

Current Behavior

Currently, when running dbt source freshness tests in a timezone other than UTC, a query like the following is compiled:

select
  max(convert_timezone('UTC', created_at_local)) as max_loaded_at,
  convert_timezone('UTC', current_timestamp()) as snapshotted_at
from raw.jaffle_shop.orders

In this case, the execution order to obtain max_loaded_at is as follows:
• Convert the timezone.
• Retrieve the maximum value of created_at_local in the converted timezone.

At this stage, the timezone conversion is performed for all records, and the maximum date is then calculated from the converted timestamps. As a result, the table is fully scanned once, and aggregation is performed.

As a result, when the number of records is large, the data freshness test takes a significant amount of time.

Expected Behavior

The ideal steps would be as follows:
• First, retrieve the latest date from the table.
• Then, perform the timezone conversion.

select
  convert_timezone('UTC', max(created_at_local)) as max_loaded_at,
  convert_timezone('UTC', current_timestamp()) as snapshotted_at
from raw.jaffle_shop.orders

Steps To Reproduce

  1. In a dbt-snowflake environment in
  2. Create sample dbt project and set up dbt profile
  3. Make sample table in snowflake for testing freshness
create or replace table D_HARATO_DB.SAMPLE_SCHEMA.SAMPLE_TABLE as
select '2024-11-24' as TEST_DATE ,'test' as  SAMPLE_VALUES
union
select '2024-11-25' as TEST_DATE ,'test' as  SAMPLE_VALUES
;
  1. “Set up data freshness tests in dbt”
# _source.yml
version: 2

sources:
  - name: sample_source
    database: D_HARATO_DB
    schema: SAMPLE_SCHEMA
    description: This source includes raw order data
    tables:
      - name: SAMPLE_TABLE
        description: Raw data for orders
        freshness:
          warn_after: {count: 18, period: hour}
          error_after: {count: 24, period: hour}
        loaded_at_field: convert_timezone('UTC', TEST_DATE)
  1. Run dbt source freshness

Relevant log output

No response

Environment

- OS:mac 
- Python:3.11.6
- dbt: 1.9.0

Which database adapter are you using with dbt?

snowflake

Additional Context

This is the profile after running the freshness test. It scans the two records created and applies timezone conversion and retrieves the latest date from the scan results.

スクリーンショット 2024-11-28 11 12 33
@Benjamin0313 Benjamin0313 added bug Something isn't working triage labels Nov 28, 2024
@Benjamin0313
Copy link
Author

Benjamin0313 commented Dec 2, 2024

I propose the following configuration as a solution.
_source.yaml

sources:
  - name: sample_source
    database: D_HARATO_DB
    schema: SAMPLE_SCHEMA
    description: This source includes raw order data
    tables:
      - name: SAMPLE_TABLE
        description: Raw data for orders
        freshness:
          warn_after: {count: 18, period: hour}
          error_after: {count: 24, period: hour}
        loaded_at_field: TEST_DATE
        time_zone: 'JST' <- add 

dbt/include/global_project/macros/adapters/freshness.sql

{% macro collect_freshness(source, loaded_at_field, filter, time_zone=None) %}
  {{ return(adapter.dispatch('collect_freshness', 'dbt')(source, loaded_at_field, filter, time_zone)) }}
{% endmacro %}

{% macro default__collect_freshness(source, loaded_at_field, filter, time_zone=None) %}
  {% call statement('collect_freshness', fetch_result=True, auto_begin=False) -%}
    select
      max({{ loaded_at_field }}) as max_loaded_at,
      {% if time_zone is None %}
        {{ current_timestamp() }} as snapshotted_at
      {% else %}
        convert_timezone( {{time_zone}} , {{ current_timestamp() }}) as snapshotted_at
      {% endif %}
    from {{ source }}
    {% if filter %}
    where {{ filter }}
    {% endif %}
  {% endcall %}
  {{ return(load_result('collect_freshness')) }}
{% endmacro %}

@dbeatty10
Copy link
Contributor

Thanks for reaching out @Benjamin0313 !

Could you provide the SQL for some example source data so we can try to replicate what you are seeing?

What happens if you do loaded_at_field: TEST_DATE instead of loaded_at_field: convert_timezone('UTC', TEST_DATE)?

Is the issue that the data type of TEST_DATE is a TIMESTAMP_NTZ implicitly in JST rather than an TIMESTAMP_TZ with an explicit UTC offset that corresponds to JST?

If TEST_DATE is either TIMESTAMP_TZ or TIMESTAMP_LTZ, I'm curious if it works as desired or not. Reading through the code, it looks to me like the result of collect_freshness is processed in Python here which will convert from the source time zone into UTC (as long as the datetime is aware rather than naive).

@Benjamin0313
Copy link
Author

Thank you for your reply @dbeatty10 !
Here is my understanding

Could you provide the SQL for some example source data so we can try to replicate what you are seeing?

Below are the SQL script to create the data source and the YAML file for running the freshness test:

_create_sample_data.sql

create or replace table  D_HARATO_DB.SAMPLE_SCHEMA.SAMPLE_TABLE as 
select '2024-12-02'::timestamp as test_date, 'id' as id
;

_source.yaml

version: 2

sources:
  - name: SAMPLE_SOURCE
    database: D_HARATO_DB
    schema: SAMPLE_SCHEMA
    description: This source includes raw order data
    tables:
      - name: SAMPLE_TABLE
        description: Raw data
        freshness:
          warn_after: {count: 18, period: hour}
          error_after: {count: 24, period: hour}
        loaded_at_field: TEST_DATE

What happens if you do loaded_at_field: TEST_DATE instead of loaded_at_field: convert_timezone('UTC', TEST_DATE)?

Due to time zone differences, it’s not possible to accurately verify the freshness of the data.
This is because TEST_DATE is stored in Japan’s time zone, while current_timestamp retrieves the time in UTC. The time zone difference prevents an accurate calculation of data freshness.

select
      max(test_date) as max_loaded_at, -- Timestamp in Japan's time zone
      convert_timezone('UTC', current_timestamp()) as snapshotted_at -- Timestamp in UTC
from D_HARATO_DB.SAMPLE_SCHEMA.SAMPLE_TABLE;

Is the issue that the data type of TEST_DATE is a TIMESTAMP_NTZ implicitly in JST rather than a TIMESTAMP_TZ with an explicit UTC offset that corresponds to JST?

Even when replacing the data type with TIMESTAMP_NTZ or TIMESTAMP_TZ, TEST_DATE is still retrieved in Japan’s time zone, while the current time is retrieved in UTC.
Therefore, I believe the presence or absence of time zone information is not the root cause of the issue.

Example with TIMESTAMP_LTZ:

create or replace table D_HARATO_DB.SAMPLE_SCHEMA.SAMPLE_TABLE as 
select '2024-12-02'::timestamp_ltz as test_date, 'id' as id;

Example with TIMESTAMP_TZ:

create or replace table D_HARATO_DB.SAMPLE_SCHEMA.SAMPLE_TABLE as 
select '2024-12-02'::timestamp_tz as test_date, 'id' as id;

output query in snowflake TIMESTAMP_LTZ and TIMESTAMP_TZ

select
      max(test_date) as max_loaded_at,
      convert_timezone('UTC', current_timestamp()) as snapshotted_at
from D_HARATO_DB.SAMPLE_SCHEMA.SAMPLE_TABLE;

I thought the reason for explicitly using loaded_at_field: convert_timezone('Australia/Sydney', 'UTC', created_at_local) was because the time zone cannot be automatically converted.

@dbeatty10
Copy link
Contributor

@Benjamin0313 It looks to me like the time zone can be automatically converted. Could you try the following?

Steps

Create these files:

models/sample_table.sql

{{ config(materialized="view") }}

select
    right(current_timestamp(), 5) as session_tz_offset,

    -- JST: Asia/Tokyo
    convert_timezone('Asia/Tokyo', {{ dbt.current_timestamp() }}) as now_offset_Tokyo,
    {{ dbt.dateadd(datepart="minute", interval=-60, from_date_or_timestamp="now_offset_Tokyo") }} as hour_ago_tz_Tokyo,
    hour_ago_tz_Tokyo::timestamp_ntz as hour_ago_ntz_Tokyo,
    hour_ago_tz_Tokyo::timestamp_ltz as hour_ago_ltz_Tokyo,

    -- PST: America/Los_Angeles
    convert_timezone('America/Los_Angeles', {{ dbt.current_timestamp() }}) as now_offset_Los_Angeles,
    {{ dbt.dateadd(datepart="minute", interval=-60, from_date_or_timestamp="now_offset_Los_Angeles") }} as hour_ago_tz_Los_Angeles,
    hour_ago_tz_Los_Angeles::timestamp_ntz as hour_ago_ntz_Los_Angeles,
    hour_ago_tz_Los_Angeles::timestamp_ltz as hour_ago_ltz_Los_Angeles,

    1 as id

models/_sources.yml

sources:
  - name: test_project
    database: "{{ target.database }}"  
    schema: "{{ target.schema }}"  
    tables:
      - name: sample_table
        freshness:
          warn_after: {count: 59, period: minute}
          error_after: {count: 60, period: minute}
        loaded_at_field: "hour_ago_tz_tokyo"

Run these commands:

dbt build --select sample_table
dbt show --inline "select * from {{ source('test_project', 'sample_table') }}" --output json
dbt source freshness

Get this output:

$ dbt show --inline "select * from {{ source('test_project', 'sample_table') }}" --output json

21:26:51  Running with dbt=1.8.8
21:26:52  Registered adapter: snowflake=1.8.4
21:26:52  Found 1 model, 1 sql operation, 1 source, 448 macros
21:26:52  
21:26:54  Concurrency: 10 threads (target='snowflake')
21:26:54  
21:26:54  {
  "show": [
    {
      "SESSION_TZ_OFFSET": "-0800",
      "NOW_OFFSET_TOKYO": "2024-12-05T06:26:54.597000+09:00",
      "HOUR_AGO_TZ_TOKYO": "2024-12-05T05:26:54.597000+09:00",
      "HOUR_AGO_NTZ_TOKYO": "2024-12-05T05:26:54.597000",
      "HOUR_AGO_LTZ_TOKYO": "2024-12-04T12:26:54.597000-08:00",
      "NOW_OFFSET_LOS_ANGELES": "2024-12-04T13:26:54.597000-08:00",
      "HOUR_AGO_TZ_LOS_ANGELES": "2024-12-04T12:26:54.597000-08:00",
      "HOUR_AGO_NTZ_LOS_ANGELES": "2024-12-04T12:26:54.597000",
      "HOUR_AGO_LTZ_LOS_ANGELES": "2024-12-04T12:26:54.597000-08:00",
      "ID": 1
    }
  ]
}
$ dbt source freshness

21:31:24  Running with dbt=1.8.8
21:31:25  Registered adapter: snowflake=1.8.4
21:31:26  Found 1 model, 1 source, 448 macros
21:31:26  
21:31:27  Pulling freshness from warehouse metadata tables for 0 sources
21:31:27  Concurrency: 10 threads (target='snowflake')
21:31:27  
21:31:27  1 of 1 START freshness of test_project.sample_table ............................ [RUN]
21:31:28  1 of 1 WARN freshness of test_project.sample_table ............................. [WARN in 1.42s]
21:31:28  
21:31:28  Finished running 1 source in 0 hours 0 minutes and 2.55 seconds (2.55s).
21:31:28  Done.

Brief explanation

The source is using the same fully-qualified database name as the sample_table model (which I should have named better like sample_view!).

By materializing that model as a view, it is always up-to-date with the current time.

I set the warning level to be one minute below the error level. This allows us to confirm that the value of loaded_at_field is indeed detected as being within the expected source freshness window (as long as it warns, but does not error).

More detail

It works for me if I use either of these:

timestamp_tz data type using the Asia/Tokyo offset:

        loaded_at_field: "hour_ago_tz_tokyo"

or timestamp_ltz data type using the offset from my Snowflake session time zone (America/Los_Angeles):

        loaded_at_field: "hour_ago_ltz_tokyo"

But it doesn't work for me if I use a naive datetime using the local time for Asia/Tokyo:

        loaded_at_field: "hour_ago_ntz_tokyo"

@Benjamin0313
Copy link
Author

Thank you for your response. @dbeatty10

As you mentioned, the above method indeed allows for data freshness testing. Based on your explanation, it seems that there is a step to convert the timezone to UTC when performing the data freshness test. Therefore, I think it would be necessary to use a view temporarily or add a column for timezone conversion, such as:

convert_timezone('Asia/Tokyo', {{ dbt.current_timestamp() }}) as now_offset_Tokyo

However, converting the timezone to UTC every time for the purpose of data freshness testing can be a bit cumbersome. Hence, I believe a better approach would be to specify the timezone in the model.yaml.

sources:
    tables:
      - name: SAMPLE_TABLE
        description: Raw data for orders
        freshness:
          warn_after: {count: 18, period: hour}
          error_after: {count: 24, period: hour}
        loaded_at_field: TEST_DATE
        time_zone: 'JST'  # <- Add

By specifying it this way, there is no need to perform timezone conversion every time during data freshness testing. Avoiding unnecessary processing within the model and handling data freshness conversion in the YAML ensures that no redundant logic is introduced into the model itself, making it a very elegant solution.

What do you think of this approach?

@dbeatty10 dbeatty10 changed the title [Bug] When performing data freshness tests in a timezone other than UTC, a full table scan may be executed, resulting in longer test execution times. [Feature] Add time_zone configuration for source freshness checks Dec 9, 2024
@dbeatty10 dbeatty10 added enhancement New feature or request and removed bug Something isn't working labels Dec 9, 2024
@dbeatty10
Copy link
Contributor

@Benjamin0313 I converted this to a feature request since it appears that dbt is behaving as we'd expect, and you'd like to introduce a new time_zone config.

Just to make sure I understand, is the crux of your problem that you have datetimes representing the local time in Asia/Tokyo (JST) stored in the Snowflake data type TIMESTAMP_NTZ?

And since TIMESTAMP_NTZ is naive / not time zone aware, then your only option is to add convert_timezone('UTC', ...)
logic to your loaded_at_field which leads to performance inefficiencies?

As shown in #11066 (comment), source freshness checks using dbt-snowflake handles the time zone aware TIMESTAMP_TZ and TIMESTAMP_LTZ data types without needing any explicit conversion to UTC, so I'm assuming you must be referring to the TIMESTAMP_NTZ data type.

@dbeatty10 dbeatty10 added awaiting_response freshness related to the dbt source freshness command and removed triage labels Dec 9, 2024
@Benjamin0313
Copy link
Author

@dbeatty10
Yes, that’s correct. I would like to add a time_zone column to the configuration.

Even with TIMESTAMP_TZ or TIMESTAMP_LTZ, the timestamps remain in local time, and no conversion to UTC is performed. Below are the observations from my environment:

When creating a source table using TIMESTAMP_TZ:

create or replace table D_HARATO_DB.DBT_TEST.DBT_FRESHNESS_TEST as
-- timestamp_tz
select '2024-12-02'::timestamp_tz as loaded_date, 'id' as id
;

describe table D_HARATO_DB.DBT_TEST.DBT_FRESHNESS_TEST;

The describe command confirms that the timezone is set to TIMESTAMP_TZ.

timestamp_tz

When running dbt source freshness, the following query is executed on Snowflake.
If you check the max value of loaded_date, it shows 2024-12-02 00:00:00, indicating that no timezone conversion to UTC has occurred.

timestamp_tz

This behavior is the same for TIMESTAMP_LTZ as well.

timestamp_ltz

timestamp_ltz

Therefore, even when specifying a timezone, there is no implicit timezone conversion to UTC.

@dbeatty10
Copy link
Contributor

Yes, that’s correct. I would like to add a time_zone column to the configuration.

@Benjamin0313 Based on my experiments in #11066 (comment), I don't think that adding a time_zone column to the configuration would be necessary for loaded_at_field columns that have the TIMESTAMP_TZ or TIMESTAMP_LTZ data types in Snowflake.

But I'm not sure why you are seeing something different 🤔

After following the example in #11066 (comment), could you do the following and share the output you get?

Create this file:

analyses/freshness_query.sql

select
      max(hour_ago_tz_tokyo) as max_loaded_at,
      convert_timezone('UTC', current_timestamp()) as snapshotted_at
    from {{ source('test_project', 'sample_table') }}

Run this command:

dbt show -s analyses/freshness_query.sql --output json

I get the following output, which shows the timezone-aware UTC offset of +09:00 that I expected to see for MAX_LOADED_AT:

$ dbt show -s analyses/freshness_query.sql --output json

18:02:18  Running with dbt=1.9.0
18:02:18  Registered adapter: snowflake=1.9.0
18:02:19  Found 2 models, 1 snapshot, 1 analysis, 1 source, 468 macros
18:02:19  
18:02:19  Concurrency: 10 threads (target='snowflake')
18:02:19  
{
  "node": "freshness_query",
  "show": [
    {
      "MAX_LOADED_AT": "2024-12-12T02:02:20.862000+09:00",
      "SNAPSHOTTED_AT": "2024-12-11T18:02:20.862000+00:00"
    }
  ]
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
awaiting_response enhancement New feature or request freshness related to the dbt source freshness command performance
Projects
None yet
Development

No branches or pull requests

2 participants