Skip to content

Commit

Permalink
Emit ArtifactWritten event when artifacts are written (#10940)
Browse files Browse the repository at this point in the history
* Add new `ArtifactWritten` event

* Emit ArtifactWritten event whenever an artifact is written

* Get artifact_type from class name for ArtifactWritten event

* Add changie docs

* Add test to check that ArtifactWritten events are being emitted

* Regenerate core_types_pb2.py using correct protobuf version

* Regen core_types_pb2 again, using a more correct protoc version
  • Loading branch information
QMalcolm authored Oct 30, 2024
1 parent 7f5abdc commit 8c6bec4
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 309 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241029-181728.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Emit debug logging event whenever artifacts are written
time: 2024-10-29T18:17:28.321188-05:00
custom:
Author: QMalcolm
Issue: 10937
6 changes: 4 additions & 2 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
)
from dbt.contracts.graph.unparsed import SourcePatch, UnparsedVersion
from dbt.contracts.util import SourceKey
from dbt.events.types import UnpinnedRefNewVersionAvailable
from dbt.events.types import ArtifactWritten, UnpinnedRefNewVersionAvailable
from dbt.exceptions import (
AmbiguousResourceNameRefError,
CompilationError,
Expand Down Expand Up @@ -1219,7 +1219,9 @@ def writable_manifest(self) -> "WritableManifest":
)

def write(self, path):
self.writable_manifest().write(path)
writable = self.writable_manifest()
writable.write(path)
fire_event(ArtifactWritten(artifact_type=writable.__class__.__name__, artifact_path=path))

# Called in dbt.compilation.Linker.write_graph and
# dbt.graph.queue.get and ._include_in_cost
Expand Down
3 changes: 2 additions & 1 deletion core/dbt/contracts/graph/semantic_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
)
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ModelNode
from dbt.events.types import SemanticValidationFailure
from dbt.events.types import ArtifactWritten, SemanticValidationFailure
from dbt.exceptions import ParsingError
from dbt_common.clients.system import write_file
from dbt_common.events.base_types import EventLevel
Expand Down Expand Up @@ -71,6 +71,7 @@ def write_json_to_file(self, file_path: str):
semantic_manifest = self._get_pydantic_semantic_manifest()
json = semantic_manifest.json()
write_file(file_path, json)
fire_event(ArtifactWritten(artifact_type=self.__class__.__name__, artifact_path=file_path))

def _get_pydantic_semantic_manifest(self) -> PydanticSemanticManifest:
pydantic_time_spines: List[PydanticTimeSpine] = []
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/contracts/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from dbt.artifacts.schemas.results import ExecutionResult, TimingInfo
from dbt.artifacts.schemas.run import RunExecutionResult, RunResult, RunResultsArtifact
from dbt.contracts.graph.nodes import ResultNode
from dbt.events.types import ArtifactWritten
from dbt_common.dataclass_schema import dbtClassMixin
from dbt_common.events.functions import fire_event

TaskTags = Optional[Dict[str, Any]]
TaskID = uuid.UUID
Expand Down Expand Up @@ -49,6 +51,7 @@ def write(self, path: str) -> None:
args=self.args,
)
writable.write(path)
fire_event(ArtifactWritten(artifact_type=writable.__class__.__name__, artifact_path=path))

@classmethod
def from_local_result(
Expand Down
13 changes: 13 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,19 @@ message DepsScrubbedPackageNameMsg{
DepsScrubbedPackageName data = 2;
}

// P - Artifacts

// P001
message ArtifactWritten {
string artifact_type = 1;
string artifact_path = 2;
}

message ArtifactWrittenMsg {
CoreEventInfo info = 1;
ArtifactWritten data = 2;
}

// Q - Node execution

// Q001
Expand Down
614 changes: 309 additions & 305 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,19 @@ def message(self) -> str:
return f"Detected secret env var in {self.package_name}. dbt will write a scrubbed representation to the lock file. This will cause issues with subsequent 'dbt deps' using the lock file, requiring 'dbt deps --upgrade'"


# =======================================================
# P - Artifacts
# =======================================================


class ArtifactWritten(DebugLevel):
def code(self):
return "P001"

def message(self) -> str:
return f"Wrote artifact {self.artifact_type} to {self.artifact_path}"


# =======================================================
# Q - Node execution
# =======================================================
Expand Down
6 changes: 6 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
)
from dbt.contracts.graph.semantic_manifest import SemanticManifest
from dbt.events.types import (
ArtifactWritten,
DeprecatedModel,
DeprecatedReference,
InvalidDisabledTargetInTestNode,
Expand Down Expand Up @@ -2019,4 +2020,9 @@ def parse_manifest(
plugin_artifacts = pm.get_manifest_artifacts(manifest)
for path, plugin_artifact in plugin_artifacts.items():
plugin_artifact.write(path)
fire_event(
ArtifactWritten(
artifact_type=plugin_artifact.__class__.__name__, artifact_path=path
)
)
return manifest
5 changes: 5 additions & 0 deletions core/dbt/task/docs/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from dbt.constants import MANIFEST_FILE_NAME
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.events.types import ArtifactWritten
from dbt.exceptions import AmbiguousCatalogMatchError
from dbt.graph import ResourceTypeSelector
from dbt.graph.graph import UniqueId
Expand Down Expand Up @@ -309,6 +310,10 @@ def run(self) -> CatalogArtifact:

catalog_path = os.path.join(self.config.project_target_path, CATALOG_FILENAME)
results.write(catalog_path)
fire_event(
ArtifactWritten(artifact_type=results.__class__.__name__, artifact_path=catalog_path)
)

if self.args.compile:
write_manifest(self.manifest, self.config.project_target_path)

Expand Down
6 changes: 6 additions & 0 deletions core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dbt.contracts.files import FileHash
from dbt.contracts.graph.nodes import HookNode
from dbt.events.types import (
ArtifactWritten,
LogDebugStackTrace,
RunningOperationCaughtError,
RunningOperationUncaughtError,
Expand Down Expand Up @@ -130,6 +131,11 @@ def run(self) -> RunResultsArtifact:

if self.args.write_json:
results.write(result_path)
fire_event(
ArtifactWritten(
artifact_type=results.__class__.__name__, artifact_path=result_path
)
)

return results

Expand Down
12 changes: 12 additions & 0 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.state import PreviousState
from dbt.events.types import (
ArtifactWritten,
ConcurrencyLine,
DefaultSelector,
EndRunResult,
Expand Down Expand Up @@ -427,6 +428,12 @@ def execute_nodes(self):

if self.args.write_json and hasattr(run_result, "write"):
run_result.write(self.result_path())
fire_event(
ArtifactWritten(
artifact_type=run_result.__class__.__name__,
artifact_path=self.result_path(),
)
)

self._cancel_connections(pool)
print_run_end_messages(self.node_results, keyboard_interrupt=True)
Expand Down Expand Up @@ -591,6 +598,11 @@ def run(self):
write_manifest(self.manifest, self.config.project_target_path)
if hasattr(result, "write"):
result.write(self.result_path())
fire_event(
ArtifactWritten(
artifact_type=result.__class__.__name__, artifact_path=self.result_path()
)
)

self.task_end_messages(result.results)
return result
Expand Down
37 changes: 36 additions & 1 deletion tests/functional/artifacts/test_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dbt.artifacts.schemas.results import RunStatus
from dbt.artifacts.schemas.run import RunResultsArtifact
from dbt.contracts.graph.manifest import WritableManifest
from dbt.events.types import ArtifactWritten
from dbt.tests.util import (
check_datetime_between,
get_artifact,
Expand All @@ -24,6 +25,7 @@
expected_run_results,
expected_versions_run_results,
)
from tests.utils import EventCatcher

models__schema_yml = """
version: 2
Expand Down Expand Up @@ -617,8 +619,9 @@ def models(self):

# Test generic "docs generate" command
def test_run_and_generate(self, project, manifest_schema_path, run_results_schema_path):
catcher = EventCatcher(ArtifactWritten)
start_time = datetime.utcnow()
results = run_dbt(["compile"])
results = run_dbt(args=["compile"], callbacks=[catcher.catch])
assert len(results) == 7
verify_manifest(
project,
Expand All @@ -627,6 +630,38 @@ def test_run_and_generate(self, project, manifest_schema_path, run_results_schem
manifest_schema_path,
)
verify_run_results(project, expected_run_results(), start_time, run_results_schema_path)
# manifest written twice, semantic manifest written twice, run results written once
assert len(catcher.caught_events) == 5
assert (
len(
[
event
for event in catcher.caught_events
if event.data.artifact_type == "WritableManifest"
]
)
> 0
)
assert (
len(
[
event
for event in catcher.caught_events
if event.data.artifact_type == "SemanticManifest"
]
)
> 0
)
assert (
len(
[
event
for event in catcher.caught_events
if event.data.artifact_type == "RunExecutionResult"
]
)
> 0
)


class TestVerifyArtifactsReferences(BaseVerifyProject):
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ def test_event_codes(self):
core_types.DepsScrubbedPackageName(package_name=""),
core_types.DepsUnpinned(revision="", git=""),
core_types.NoNodesForSelectionCriteria(spec_raw=""),
# P - Artifacts ======================
core_types.ArtifactWritten(artifact_type="manifest", artifact_path="path/to/artifact.json"),
# Q - Node execution ======================
core_types.RunningOperationCaughtError(exc=""),
core_types.CompileComplete(),
Expand Down

0 comments on commit 8c6bec4

Please sign in to comment.