From 4ff6bdb46b8d1a9282b475931ba753806f1eef0c Mon Sep 17 00:00:00 2001 From: Peter Webb Date: Tue, 21 May 2024 15:31:15 -0400 Subject: [PATCH] Improve the memory efficiency of process_results() override (#1050) * Improve the memory efficiency of process_results() override. --------- Co-authored-by: Mike Alfare <13974384+mikealfare@users.noreply.github.com> --- .../Under the Hood-20240517-143743.yaml | 6 +++++ dbt/adapters/snowflake/connections.py | 23 +++++++++++-------- 2 files changed, 19 insertions(+), 10 deletions(-) create mode 100644 .changes/unreleased/Under the Hood-20240517-143743.yaml diff --git a/.changes/unreleased/Under the Hood-20240517-143743.yaml b/.changes/unreleased/Under the Hood-20240517-143743.yaml new file mode 100644 index 000000000..598c60ad4 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20240517-143743.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Improve memory efficiency of the process_results() override. +time: 2024-05-17T14:37:43.7414-04:00 +custom: + Author: peterallenwebb + Issue: "1053" diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py index af26279e8..1d6e31c93 100644 --- a/dbt/adapters/snowflake/connections.py +++ b/dbt/adapters/snowflake/connections.py @@ -8,7 +8,7 @@ from dataclasses import dataclass from io import StringIO from time import sleep -from typing import Optional, Tuple, Union, Any, List +from typing import Any, List, Iterable, Optional, Tuple, Union import agate from dbt_common.clients.agate_helper import empty_table @@ -443,25 +443,28 @@ def _split_queries(cls, sql): split_query = snowflake.connector.util_text.split_statements(sql_buf) return [part[0] for part in split_query] - @classmethod - def process_results(cls, column_names, rows): - # Override for Snowflake. The datetime objects returned by - # snowflake-connector-python are not pickleable, so we need - # to replace them with sane timezones - fixed = [] + @staticmethod + def _fix_rows(rows: Iterable[Iterable]) -> Iterable[Iterable]: + # See note in process_results(). for row in rows: fixed_row = [] for col in row: if isinstance(col, datetime.datetime) and col.tzinfo: offset = col.utcoffset() + assert offset is not None offset_seconds = offset.total_seconds() - new_timezone = pytz.FixedOffset(offset_seconds // 60) + new_timezone = pytz.FixedOffset(int(offset_seconds // 60)) col = col.astimezone(tz=new_timezone) fixed_row.append(col) - fixed.append(fixed_row) + yield fixed_row - return super().process_results(column_names, fixed) + @classmethod + def process_results(cls, column_names, rows): + # Override for Snowflake. The datetime objects returned by + # snowflake-connector-python are not pickleable, so we need + # to replace them with sane timezones. + return super().process_results(column_names, cls._fix_rows(rows)) def execute( self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None