Skip to content

Commit

Permalink
Remove ResultNode usage from connections (#9211)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Dec 6, 2023
1 parent e01eb30 commit ed8f5d3
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 21 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20231205-165812.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Remove usage of dbt.contracts.graph.nodes.ResultNode in dbt/adapters
time: 2023-12-05T16:58:12.932172+09:00
custom:
Author: michelleark
Issue: "9214"
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def exception_handler(self, sql: str) -> ContextManager:

def set_connection_name(self, name: Optional[str] = None) -> Connection:
"""Called by 'acquire_connection' in BaseAdapter, which is called by
'connection_named', called by 'connection_for(node)'.
'connection_named'.
Creates a connection for this thread if one doesn't already
exist, and will rename an existing connection."""

Expand Down
10 changes: 2 additions & 8 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
)
from dbt.common.clients.jinja import CallableMacroGenerator
from dbt.contracts.graph.manifest import Manifest, MacroManifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.common.events.functions import fire_event, warn_or_error
from dbt.adapters.events.types import (
CacheMiss,
Expand Down Expand Up @@ -285,22 +284,17 @@ def nice_connection_name(self) -> str:
return conn.name

@contextmanager
def connection_named(self, name: str, node: Optional[ResultNode] = None) -> Iterator[None]:
def connection_named(self, name: str, query_header_context: Any = None) -> Iterator[None]:
try:
if self.connections.query_header is not None:
self.connections.query_header.set(name, node)
self.connections.query_header.set(name, query_header_context)
self.acquire_connection(name)
yield
finally:
self.release_connection()
if self.connections.query_header is not None:
self.connections.query_header.reset()

@contextmanager
def connection_for(self, node: ResultNode) -> Iterator[None]:
with self.connection_named(node.unique_id, node):
yield

@available.parse(lambda *a, **k: ("", empty_table()))
def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None
Expand Down
19 changes: 9 additions & 10 deletions core/dbt/adapters/base/query_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@

from dbt.context.manifest import generate_query_header_context
from dbt.adapters.contracts.connection import AdapterRequiredConfig, QueryComment
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.graph.manifest import Manifest
from dbt.common.exceptions import DbtRuntimeError


class NodeWrapper:
def __init__(self, node) -> None:
self._inner_node = node
class QueryHeaderContextWrapper:
def __init__(self, context) -> None:
self._inner_context = context

def __getattr__(self, name):
return getattr(self._inner_node, name, "")
return getattr(self._inner_context, name, "")


class _QueryComment(local):
Expand Down Expand Up @@ -53,7 +52,7 @@ def set(self, comment: Optional[str], append: bool):
self.append = append


QueryStringFunc = Callable[[str, Optional[NodeWrapper]], str]
QueryStringFunc = Callable[[str, Optional[QueryHeaderContextWrapper]], str]


class MacroQueryStringSetter:
Expand Down Expand Up @@ -90,10 +89,10 @@ def add(self, sql: str) -> str:
def reset(self):
self.set("master", None)

def set(self, name: str, node: Optional[ResultNode]):
wrapped: Optional[NodeWrapper] = None
if node is not None:
wrapped = NodeWrapper(node)
def set(self, name: str, query_header_context: Any):
wrapped: Optional[QueryHeaderContextWrapper] = None
if query_header_context is not None:
wrapped = QueryHeaderContextWrapper(query_header_context)
comment_str = self.generator(name, wrapped)

append = False
Expand Down
4 changes: 3 additions & 1 deletion core/dbt/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ def from_run_result(self, result, start_time, timing_info):

def compile_and_execute(self, manifest, ctx):
result = None
with self.adapter.connection_for(self.node) if get_flags().INTROSPECT else nullcontext():
with self.adapter.connection_named(
self.node.unique_id, self.node
) if get_flags().INTROSPECT else nullcontext():
ctx.node.update_event_status(node_status=RunningStatus.Compiling)
fire_event(
NodeCompiling(
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def from_run_result(self, result, start_time, timing_info):
def execute(self, compiled_node, manifest):
relation = self.adapter.Relation.create_from_source(compiled_node)
# given a Source, calculate its freshness.
with self.adapter.connection_for(compiled_node):
with self.adapter.connection_named(compiled_node.unique_id, compiled_node):
self.adapter.clear_transaction()
adapter_response: Optional[AdapterResponse] = None
freshness = None
Expand Down

0 comments on commit ed8f5d3

Please sign in to comment.