Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tidy-First]: Fix timings object for hooks and macros, and make types of timings explicit #10882

Merged
merged 6 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions core/dbt/artifacts/schemas/results.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Union
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Union

from dbt.contracts.graph.nodes import ResultNode
from dbt_common.dataclass_schema import StrEnum, dbtClassMixin
Expand All @@ -10,7 +10,13 @@

@dataclass
class TimingInfo(dbtClassMixin):
name: str
"""
Represents a step in the execution of a node.
`name` should be one of: compile, execute, or other
Do not call directly, use `collect_timing_info` instead.
"""

name: Literal["compile", "execute", "other"]
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None

Expand All @@ -21,7 +27,7 @@ def end(self):
self.completed_at = datetime.utcnow()

def to_msg_dict(self):
msg_dict = {"name": self.name}
msg_dict = {"name": str(self.name)}
if self.started_at:
msg_dict["started_at"] = datetime_to_json_string(self.started_at)
if self.completed_at:
Expand All @@ -31,7 +37,9 @@ def to_msg_dict(self):

# This is a context manager
class collect_timing_info:
def __init__(self, name: str, callback: Callable[[TimingInfo], None]) -> None:
def __init__(
self, name: Literal["compile", "execute", "other"], callback: Callable[[TimingInfo], None]
) -> None:
self.timing_info = TimingInfo(name=name)
self.callback = callback

Expand Down
19 changes: 13 additions & 6 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
RunningStatus,
RunStatus,
TimingInfo,
collect_timing_info,
)
from dbt.artifacts.schemas.run import RunResult
from dbt.cli.flags import Flags
Expand Down Expand Up @@ -633,7 +634,6 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]:
def safe_run_hooks(
self, adapter: BaseAdapter, hook_type: RunHookType, extra_context: Dict[str, Any]
) -> RunStatus:
started_at = datetime.utcnow()
ordered_hooks = self.get_hooks_by_type(hook_type)

if hook_type == RunHookType.End and ordered_hooks:
Expand All @@ -653,14 +653,20 @@ def safe_run_hooks(
hook.index = idx
hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}"
execution_time = 0.0
timing = []
timing: List[TimingInfo] = []
failures = 1

if not failed:
with collect_timing_info("compile", timing.append):
sql = self.get_hook_sql(
adapter, hook, hook.index, num_hooks, extra_context
)

started_at = timing[0].started_at or datetime.utcnow()
hook.update_event_status(
started_at=started_at.isoformat(), node_status=RunningStatus.Started
)
sql = self.get_hook_sql(adapter, hook, hook.index, num_hooks, extra_context)

fire_event(
LogHookStartLine(
statement=hook_name,
Expand All @@ -670,11 +676,12 @@ def safe_run_hooks(
)
)

status, message = get_execution_status(sql, adapter)
finished_at = datetime.utcnow()
with collect_timing_info("execute", timing.append):
status, message = get_execution_status(sql, adapter)

finished_at = timing[1].completed_at or datetime.utcnow()
hook.update_event_status(finished_at=finished_at.isoformat())
execution_time = (finished_at - started_at).total_seconds()
timing = [TimingInfo(hook_name, started_at, finished_at)]
failures = 0 if status == RunStatus.Success else 1

if status == RunStatus.Success:
Expand Down
46 changes: 26 additions & 20 deletions core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import threading
import traceback
from datetime import datetime
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List

import dbt_common.exceptions
from dbt.adapters.factory import get_adapter
from dbt.artifacts.schemas.results import RunStatus, TimingInfo
from dbt.artifacts.schemas.results import RunStatus, TimingInfo, collect_timing_info
from dbt.artifacts.schemas.run import RunResult, RunResultsArtifact
from dbt.contracts.files import FileHash
from dbt.contracts.graph.nodes import HookNode
Expand Down Expand Up @@ -51,25 +51,29 @@
return res

def run(self) -> RunResultsArtifact:
start = datetime.utcnow()
self.compile_manifest()
timing: List[TimingInfo] = []

success = True
with collect_timing_info("compile", timing.append):
self.compile_manifest()

start = timing[0].started_at

success = True
package_name, macro_name = self._get_macro_parts()

try:
self._run_unsafe(package_name, macro_name)
except dbt_common.exceptions.DbtBaseException as exc:
fire_event(RunningOperationCaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
except Exception as exc:
fire_event(RunningOperationUncaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
with collect_timing_info("execute", timing.append):
try:
self._run_unsafe(package_name, macro_name)
except dbt_common.exceptions.DbtBaseException as exc:
fire_event(RunningOperationCaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False
except Exception as exc:
fire_event(RunningOperationUncaughtError(exc=str(exc)))
fire_event(LogDebugStackTrace(exc_info=traceback.format_exc()))
success = False

Check warning on line 74 in core/dbt/task/run_operation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run_operation.py#L71-L74

Added lines #L71 - L74 were not covered by tests

end = datetime.utcnow()
end = timing[1].completed_at

macro = (
self.manifest.find_macro_by_name(macro_name, self.config.project_name, package_name)
Expand All @@ -85,10 +89,12 @@
f"dbt could not find a macro with the name '{macro_name}' in any package"
)

execution_time = (end - start).total_seconds() if start and end else 0.0

run_result = RunResult(
adapter_response={},
status=RunStatus.Success if success else RunStatus.Error,
execution_time=(end - start).total_seconds(),
execution_time=execution_time,
failures=0 if success else 1,
message=None,
node=HookNode(
Expand All @@ -105,13 +111,13 @@
original_file_path="",
),
thread_id=threading.current_thread().name,
timing=[TimingInfo(name=macro_name, started_at=start, completed_at=end)],
timing=timing,
batch_results=None,
)

results = RunResultsArtifact.from_execution_results(
generated_at=end,
elapsed_time=(end - start).total_seconds(),
generated_at=end or datetime.utcnow(),
elapsed_time=execution_time,
args={
k: v
for k, v in self.args.__dict__.items()
Expand Down
6 changes: 5 additions & 1 deletion schemas/dbt/run-results/v6.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@
"title": "TimingInfo",
"properties": {
"name": {
"type": "string"
"enum": [
"compile",
"execute",
"other"
]
},
"started_at": {
"anyOf": [
Expand Down
6 changes: 5 additions & 1 deletion schemas/dbt/sources/v3.json
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,11 @@
"title": "TimingInfo",
"properties": {
"name": {
"type": "string"
"enum": [
"compile",
"execute",
"other"
]
},
"started_at": {
"anyOf": [
Expand Down
8 changes: 8 additions & 0 deletions tests/functional/adapter/hooks/test_on_run_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ def test_results(self, project, log_counts, my_model_run_status):
for result in results
if isinstance(result.node, HookNode)
] == [(id, str(status)) for id, status in expected_results if id.startswith("operation")]

for result in results:
if result.status == RunStatus.Skipped:
continue

timing_keys = [timing.name for timing in result.timing]
assert timing_keys == ["compile", "execute"]

assert log_counts in log_output
assert "4 project hooks, 1 view model" in log_output

Expand Down
2 changes: 1 addition & 1 deletion tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strateg
):
# Initial run
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
run_dbt(["run"], expect_pass=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is because the new dbt-adapters change altered the behavior here


# Incremental run fails
with patch_microbatch_end_time("2020-01-03 13:57:00"):
Expand Down
17 changes: 17 additions & 0 deletions tests/functional/run_operations/test_run_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
import yaml

from dbt.artifacts.schemas.results import RunStatus
from dbt.tests.util import (
check_table_does_exist,
mkdir,
Expand Down Expand Up @@ -135,9 +136,25 @@ def test_run_operation_local_macro(self, project):
run_dbt(["deps"])

results, log_output = run_dbt_and_capture(["run-operation", "something_cool"])

for result in results:
if result.status == RunStatus.Skipped:
continue

timing_keys = [timing.name for timing in result.timing]
assert timing_keys == ["compile", "execute"]

assert "something cool" in log_output

results, log_output = run_dbt_and_capture(["run-operation", "pkg.something_cool"])

for result in results:
if result.status == RunStatus.Skipped:
continue

timing_keys = [timing.name for timing in result.timing]
assert timing_keys == ["compile", "execute"]

assert "something cool" in log_output

rm_dir("pkg")
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ def test_all_serializable(self):


def test_date_serialization():
ti = TimingInfo("test")
ti = TimingInfo("compile")
ti.begin()
ti.end()
ti_dict = ti.to_dict()
Expand Down
Loading