From 9ab6502196506e6ec0822ec14a8a2699dc4fabc9 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Thu, 16 May 2024 22:51:53 -0400 Subject: [PATCH] Improve the memory efficiency of process_results() override. --- dbt/adapters/snowflake/connections.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py index af26279e8..1d6e31c93 100644 --- a/dbt/adapters/snowflake/connections.py +++ b/dbt/adapters/snowflake/connections.py @@ -8,7 +8,7 @@ from dataclasses import dataclass from io import StringIO from time import sleep -from typing import Optional, Tuple, Union, Any, List +from typing import Any, List, Iterable, Optional, Tuple, Union import agate from dbt_common.clients.agate_helper import empty_table @@ -443,25 +443,28 @@ def _split_queries(cls, sql): split_query = snowflake.connector.util_text.split_statements(sql_buf) return [part[0] for part in split_query] - @classmethod - def process_results(cls, column_names, rows): - # Override for Snowflake. The datetime objects returned by - # snowflake-connector-python are not pickleable, so we need - # to replace them with sane timezones - fixed = [] + @staticmethod + def _fix_rows(rows: Iterable[Iterable]) -> Iterable[Iterable]: + # See note in process_results(). for row in rows: fixed_row = [] for col in row: if isinstance(col, datetime.datetime) and col.tzinfo: offset = col.utcoffset() + assert offset is not None offset_seconds = offset.total_seconds() - new_timezone = pytz.FixedOffset(offset_seconds // 60) + new_timezone = pytz.FixedOffset(int(offset_seconds // 60)) col = col.astimezone(tz=new_timezone) fixed_row.append(col) - fixed.append(fixed_row) + yield fixed_row - return super().process_results(column_names, fixed) + @classmethod + def process_results(cls, column_names, rows): + # Override for Snowflake. The datetime objects returned by + # snowflake-connector-python are not pickleable, so we need + # to replace them with sane timezones. + return super().process_results(column_names, cls._fix_rows(rows)) def execute( self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None