diff --git a/.changes/unreleased/Under the Hood-20230110-115725.yaml b/.changes/unreleased/Under the Hood-20230110-115725.yaml new file mode 100644 index 00000000000..81dfefdba91 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20230110-115725.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Abstract manifest generation +time: 2023-01-10T11:57:25.193965-06:00 +custom: + Author: stu-k + Issue: "6357" diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index 57dc01f4aa6..1adc432daf2 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -5,7 +5,6 @@ import click from dbt.cli import requires, params as p -from dbt.config import RuntimeConfig from dbt.config.project import Project from dbt.config.profile import Profile from dbt.contracts.graph.manifest import Manifest @@ -120,10 +119,15 @@ def cli(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def build(ctx, **kwargs): """Run all Seeds, Models, Snapshots, and tests in DAG order""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = BuildTask(ctx.obj["flags"], config) + task = BuildTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -166,7 +170,6 @@ def docs(ctx, **kwargs): @p.models @p.profile @p.profiles_dir -@p.project_dir @p.select @p.selector @p.state @@ -178,10 +181,11 @@ def docs(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def docs_generate(ctx, **kwargs): """Generate the documentation website for your project""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = GenerateTask(ctx.obj["flags"], config) + task = GenerateTask(ctx.obj["flags"], ctx.obj["runtime_config"]) results = task.run() success = task.interpret_results(results) @@ -225,11 +229,18 @@ def docs_serve(ctx, **kwargs): @p.vars @p.version_check @requires.preflight +@requires.profile +@requires.project +@requires.runtime_config +@requires.manifest def compile(ctx, **kwargs): """Generates executable SQL from source, model, test, and analysis files. Compiled SQL files are written to the target/ directory.""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = CompileTask(ctx.obj["flags"], config) + task = CompileTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -267,7 +278,6 @@ def debug(ctx, **kwargs): def deps(ctx, **kwargs): """Pull the most recent version of the dependencies listed in packages.yml""" task = DepsTask(ctx.obj["flags"], ctx.obj["project"]) - results = task.run() success = task.interpret_results(results) return results, success @@ -311,10 +321,15 @@ def init(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def list(ctx, **kwargs): """List the resources in your project""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = ListTask(ctx.obj["flags"], config) + task = ListTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -341,9 +356,13 @@ def list(ctx, **kwargs): @p.version_check @p.write_manifest @requires.preflight +@requires.profile +@requires.project +@requires.runtime_config +@requires.manifest(write_perf_info=True) def parse(ctx, **kwargs): """Parses the project and provides information on performance""" - click.echo(f"`{inspect.stack()[0][3]}` called\n flags: {ctx.obj['flags']}") + # manifest generation and writing happens in @requires.manifest return None, True @@ -369,10 +388,15 @@ def parse(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def run(ctx, **kwargs): """Compile SQL and execute against the current target database.""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = RunTask(ctx.obj["flags"], config) + task = RunTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -392,10 +416,15 @@ def run(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def run_operation(ctx, **kwargs): """Run the named macro with any supplied arguments.""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = RunOperationTask(ctx.obj["flags"], config) + task = RunOperationTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -423,10 +452,15 @@ def run_operation(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def seed(ctx, **kwargs): """Load data from csv files into your data warehouse.""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = SeedTask(ctx.obj["flags"], config) + task = SeedTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -451,10 +485,15 @@ def seed(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def snapshot(ctx, **kwargs): """Execute snapshots defined in your project""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = SnapshotTask(ctx.obj["flags"], config) + task = SnapshotTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -486,10 +525,15 @@ def source(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def freshness(ctx, **kwargs): """check the current freshness of the project's sources""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = FreshnessTask(ctx.obj["flags"], config) + task = FreshnessTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -525,10 +569,15 @@ def freshness(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def test(ctx, **kwargs): """Runs tests on data in deployed models. Run this after `dbt run`""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = TestTask(ctx.obj["flags"], config) + task = TestTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) diff --git a/core/dbt/cli/requires.py b/core/dbt/cli/requires.py index 690c12bfa10..74ec80c986a 100644 --- a/core/dbt/cli/requires.py +++ b/core/dbt/cli/requires.py @@ -1,8 +1,10 @@ -from dbt.adapters.factory import adapter_management +from dbt.adapters.factory import adapter_management, register_adapter from dbt.cli.flags import Flags +from dbt.config import RuntimeConfig from dbt.config.runtime import load_project, load_profile from dbt.events.functions import setup_event_logger from dbt.exceptions import DbtProjectError +from dbt.parser.manifest import ManifestLoader, write_manifest from dbt.profiler import profiler from dbt.tracking import initialize_from_flags, track_run @@ -85,3 +87,70 @@ def wrapper(*args, **kwargs): return func(*args, **kwargs) return update_wrapper(wrapper, func) + + +def runtime_config(func): + """A decorator used by click command functions for generating a runtime + config given a profile and project. + """ + + def wrapper(*args, **kwargs): + ctx = args[0] + assert isinstance(ctx, Context) + + req_strs = ["profile", "project"] + reqs = [ctx.obj.get(req_str) for req_str in req_strs] + + if None in reqs: + raise DbtProjectError("profile and project required for runtime_config") + + ctx.obj["runtime_config"] = RuntimeConfig.from_parts( + ctx.obj["project"], + ctx.obj["profile"], + ctx.obj["flags"], + ) + + return func(*args, **kwargs) + + return update_wrapper(wrapper, func) + + +def manifest(*args0, write_perf_info=False): + """A decorator used by click command functions for generating a manifest + given a profile, project, and runtime config. This also registers the adaper + from the runtime config and writes the manifest to disc. + """ + + def outer_wrapper(func): + def wrapper(*args, **kwargs): + ctx = args[0] + assert isinstance(ctx, Context) + + req_strs = ["profile", "project", "runtime_config"] + reqs = [ctx.obj.get(dep) for dep in req_strs] + + if None in reqs: + raise DbtProjectError("profile, project, and runtime_config required for manifest") + + runtime_config = ctx.obj["runtime_config"] + register_adapter(runtime_config) + + # a manifest has already been set on the context, so don't overwrite it + if ctx.obj.get("manifest") is None: + manifest = ManifestLoader.get_full_manifest( + runtime_config, write_perf_info=write_perf_info + ) + + ctx.obj["manifest"] = manifest + if ctx.obj["flags"].write_json: + write_manifest(manifest, ctx.obj["runtime_config"].target_path) + + return func(*args, **kwargs) + + return update_wrapper(wrapper, func) + + # if there are no args, the decorator was used without params @decorator + # otherwise, the decorator was called with params @decorator(arg) + if len(args0) == 0: + return outer_wrapper + return outer_wrapper(args0[0]) diff --git a/core/dbt/docs/build/doctrees/environment.pickle b/core/dbt/docs/build/doctrees/environment.pickle index a9c2d07f929..3c70ad3fe4e 100644 Binary files a/core/dbt/docs/build/doctrees/environment.pickle and b/core/dbt/docs/build/doctrees/environment.pickle differ diff --git a/core/dbt/docs/build/doctrees/index.doctree b/core/dbt/docs/build/doctrees/index.doctree index 19135572dd5..45098290ce9 100644 Binary files a/core/dbt/docs/build/doctrees/index.doctree and b/core/dbt/docs/build/doctrees/index.doctree differ diff --git a/core/dbt/docs/build/html/_sources/index.rst.txt b/core/dbt/docs/build/html/_sources/index.rst.txt index 93d34a648f2..dcd1c82499f 100644 --- a/core/dbt/docs/build/html/_sources/index.rst.txt +++ b/core/dbt/docs/build/html/_sources/index.rst.txt @@ -6,6 +6,7 @@ How to invoke dbt commands in python runtime Right now the best way to invoke a command from python runtime is to use the `dbtRunner` we exposed .. code-block:: python + from dbt.cli.main import dbtRunner cli_args = ['run', '--project-dir', 'jaffle_shop'] diff --git a/core/dbt/docs/build/html/index.html b/core/dbt/docs/build/html/index.html index d0fd61a3227..a8b024ba555 100644 --- a/core/dbt/docs/build/html/index.html +++ b/core/dbt/docs/build/html/index.html @@ -35,6 +35,15 @@

dbt-core’s API documentation

How to invoke dbt commands in python runtime

Right now the best way to invoke a command from python runtime is to use the dbtRunner we exposed

+
from dbt.cli.main import dbtRunner
+cli_args = ['run', '--project-dir', 'jaffle_shop']
+
+# initialize the dbt runner
+dbt = dbtRunner()
+# run the command
+res, success = dbt.invoke(args)
+
+

You can also pass in pre constructed object into dbtRunner, and we will use those objects instead of loading up from the disk.

# preload profile and project
 profile = load_profile(project_dir, {}, 'testing-postgres')
@@ -92,6 +101,11 @@ 

project_dir +

resource_types

+

Type: unknown

+

TODO: No current help text

+

select

Type: unknown

@@ -313,6 +327,10 @@

vars

Command: init

+
+

project_name

+

Type: string

+

profile

Type: string

@@ -618,6 +636,10 @@

version_check

+
+

macro

+

Type: string

+

args

Type: YAML

diff --git a/core/dbt/docs/build/html/searchindex.js b/core/dbt/docs/build/html/searchindex.js index 3ed297346d9..36036732601 100644 --- a/core/dbt/docs/build/html/searchindex.js +++ b/core/dbt/docs/build/html/searchindex.js @@ -1 +1 @@ -Search.setIndex({"docnames": ["index"], "filenames": ["index.rst"], "titles": ["dbt-core\u2019s API documentation"], "terms": {"right": 0, "now": 0, "best": 0, "wai": 0, "from": 0, "i": 0, "us": 0, "dbtrunner": 0, "we": 0, "expos": 0, "you": 0, "can": 0, "also": 0, "pass": 0, "pre": 0, "construct": 0, "object": 0, "those": 0, "instead": 0, "load": 0, "up": 0, "disk": 0, "preload": 0, "project": 0, "load_profil": 0, "postgr": 0, "load_project": 0, "fals": 0, "initi": 0, "runner": 0, "thi": 0, "re": 0, "success": 0, "cli_arg": 0, "For": 0, "full": 0, "exampl": 0, "code": 0, "refer": 0, "cli": 0, "py": 0, "type": 0, "boolean": 0, "If": 0, "set": 0, "variabl": 0, "resolv": 0, "unselect": 0, "node": 0, "unknown": 0, "specifi": 0, "stop": 0, "execut": 0, "first": 0, "failur": 0, "drop": 0, "increment": 0, "fulli": 0, "recalcul": 0, "tabl": 0, "definit": 0, "choic": 0, "eager": 0, "cautiou": 0, "all": 0, "ar": 0, "adjac": 0, "resourc": 0, "even": 0, "thei": 0, "have": 0, "been": 0, "explicitli": 0, "string": 0, "which": 0, "overrid": 0, "dbt_project": 0, "yml": 0, "path": 0, "directori": 0, "look": 0, "file": 0, "current": 0, "work": 0, "home": 0, "default": 0, "its": 0, "parent": 0, "includ": 0, "The": 0, "name": 0, "defin": 0, "sampl": 0, "data": 0, "termin": 0, "given": 0, "json": 0, "compar": 0, "store": 0, "result": 0, "fail": 0, "row": 0, "databas": 0, "configur": 0, "onli": 0, "appli": 0, "dbt_target_path": 0, "int": 0, "number": 0, "while": 0, "yaml": 0, "suppli": 0, "argument": 0, "your": 0, "should": 0, "eg": 0, "my_vari": 0, "my_valu": 0, "ensur": 0, "version": 0, "match": 0, "one": 0, "requir": 0, "todo": 0, "No": 0, "help": 0, "text": 0, "avail": 0, "inform": 0, "skip": 0, "inter": 0, "setup": 0, "macro": 0, "dictionari": 0, "map": 0, "keyword": 0}, "objects": {}, "objtypes": {}, "objnames": {}, "titleterms": {"dbt": 0, "core": 0, "": 0, "api": 0, "document": 0, "how": 0, "invok": 0, "command": 0, "python": 0, "runtim": 0, "build": 0, "defer": 0, "exclud": 0, "fail_fast": 0, "full_refresh": 0, "indirect_select": 0, "profil": 0, "profiles_dir": 0, "project_dir": 0, "select": 0, "selector": 0, "show": 0, "state": 0, "store_failur": 0, "target": 0, "target_path": 0, "thread": 0, "var": 0, "version_check": 0, "clean": 0, "compil": 0, "model": 0, "parse_onli": 0, "debug": 0, "config_dir": 0, "dep": 0, "doc": 0, "init": 0, "skip_profile_setup": 0, "list": 0, "output": 0, "output_kei": 0, "resource_typ": 0, "pars": 0, "write_manifest": 0, "run": 0, "run_oper": 0, "arg": 0, "seed": 0, "snapshot": 0, "sourc": 0, "test": 0}, "envversion": {"sphinx.domains.c": 2, "sphinx.domains.changeset": 1, "sphinx.domains.citation": 1, "sphinx.domains.cpp": 8, "sphinx.domains.index": 1, "sphinx.domains.javascript": 2, "sphinx.domains.math": 2, "sphinx.domains.python": 3, "sphinx.domains.rst": 2, "sphinx.domains.std": 2, "sphinx": 57}, "alltitles": {"dbt-core\u2019s API documentation": [[0, "dbt-core-s-api-documentation"]], "How to invoke dbt commands in python runtime": [[0, "how-to-invoke-dbt-commands-in-python-runtime"]], "API documentation": [[0, "api-documentation"]], "Command: build": [[0, "dbt-section"]], "defer": [[0, "build|defer"], [0, "compile|defer"], [0, "run|defer"], [0, "snapshot|defer"], [0, "test|defer"]], "exclude": [[0, "build|exclude"], [0, "compile|exclude"], [0, "list|exclude"], [0, "list|exclude"], [0, "run|exclude"], [0, "seed|exclude"], [0, "snapshot|exclude"], [0, "test|exclude"]], "fail_fast": [[0, "build|fail_fast"], [0, "run|fail_fast"], [0, "test|fail_fast"]], "full_refresh": [[0, "build|full_refresh"], [0, "compile|full_refresh"], [0, "run|full_refresh"], [0, "seed|full_refresh"]], "indirect_selection": [[0, "build|indirect_selection"], [0, "list|indirect_selection"], [0, "list|indirect_selection"], [0, "test|indirect_selection"]], "profile": [[0, "build|profile"], [0, "clean|profile"], [0, "compile|profile"], [0, "debug|profile"], [0, "deps|profile"], [0, "init|profile"], [0, "list|profile"], [0, "list|profile"], [0, "parse|profile"], [0, "run|profile"], [0, "run-operation|profile"], [0, "seed|profile"], [0, "snapshot|profile"], [0, "test|profile"]], "profiles_dir": [[0, "build|profiles_dir"], [0, "clean|profiles_dir"], [0, "compile|profiles_dir"], [0, "debug|profiles_dir"], [0, "deps|profiles_dir"], [0, "init|profiles_dir"], [0, "list|profiles_dir"], [0, "list|profiles_dir"], [0, "parse|profiles_dir"], [0, "run|profiles_dir"], [0, "run-operation|profiles_dir"], [0, "seed|profiles_dir"], [0, "snapshot|profiles_dir"], [0, "test|profiles_dir"]], "project_dir": [[0, "build|project_dir"], [0, "clean|project_dir"], [0, "compile|project_dir"], [0, "debug|project_dir"], [0, "deps|project_dir"], [0, "init|project_dir"], [0, "list|project_dir"], [0, "list|project_dir"], [0, "parse|project_dir"], [0, "run|project_dir"], [0, "run-operation|project_dir"], [0, "seed|project_dir"], [0, "snapshot|project_dir"], [0, "test|project_dir"]], "select": [[0, "build|select"], [0, "compile|select"], [0, "list|select"], [0, "list|select"], [0, "run|select"], [0, "seed|select"], [0, "snapshot|select"], [0, "test|select"]], "selector": [[0, "build|selector"], [0, "compile|selector"], [0, "list|selector"], [0, "list|selector"], [0, "run|selector"], [0, "seed|selector"], [0, "snapshot|selector"], [0, "test|selector"]], "show": [[0, "build|show"], [0, "seed|show"]], "state": [[0, "build|state"], [0, "compile|state"], [0, "list|state"], [0, "list|state"], [0, "run|state"], [0, "seed|state"], [0, "snapshot|state"], [0, "test|state"]], "store_failures": [[0, "build|store_failures"], [0, "test|store_failures"]], "target": [[0, "build|target"], [0, "clean|target"], [0, "compile|target"], [0, "debug|target"], [0, "deps|target"], [0, "init|target"], [0, "list|target"], [0, "list|target"], [0, "parse|target"], [0, "run|target"], [0, "run-operation|target"], [0, "seed|target"], [0, "snapshot|target"], [0, "test|target"]], "target_path": [[0, "build|target_path"], [0, "compile|target_path"], [0, "parse|target_path"], [0, "run|target_path"], [0, "seed|target_path"], [0, "test|target_path"]], "threads": [[0, "build|threads"], [0, "compile|threads"], [0, "parse|threads"], [0, "run|threads"], [0, "seed|threads"], [0, "snapshot|threads"], [0, "test|threads"]], "vars": [[0, "build|vars"], [0, "clean|vars"], [0, "compile|vars"], [0, "debug|vars"], [0, "deps|vars"], [0, "init|vars"], [0, "list|vars"], [0, "list|vars"], [0, "parse|vars"], [0, "run|vars"], [0, "run-operation|vars"], [0, "seed|vars"], [0, "snapshot|vars"], [0, "test|vars"]], "version_check": [[0, "build|version_check"], [0, "compile|version_check"], [0, "debug|version_check"], [0, "parse|version_check"], [0, "run|version_check"], [0, "seed|version_check"], [0, "test|version_check"]], "Command: clean": [[0, "dbt-section"]], "Command: compile": [[0, "dbt-section"]], "models": [[0, "compile|models"], [0, "list|models"], [0, "list|models"], [0, "run|models"], [0, "seed|models"], [0, "snapshot|models"], [0, "test|models"]], "parse_only": [[0, "compile|parse_only"]], "Command: debug": [[0, "dbt-section"]], "config_dir": [[0, "debug|config_dir"]], "Command: deps": [[0, "dbt-section"]], "Command: docs": [[0, "dbt-section"]], "Command: init": [[0, "dbt-section"]], "skip_profile_setup": [[0, "init|skip_profile_setup"]], "Command: list": [[0, "dbt-section"], [0, "dbt-section"]], "output": [[0, "list|output"], [0, "list|output"]], "output_keys": [[0, "list|output_keys"], [0, "list|output_keys"]], "resource_types": [[0, "list|resource_types"], [0, "list|resource_types"]], "Command: parse": [[0, "dbt-section"]], "compile": [[0, "parse|compile"]], "write_manifest": [[0, "parse|write_manifest"]], "Command: run": [[0, "dbt-section"]], "Command: run_operation": [[0, "dbt-section"]], "args": [[0, "run-operation|args"]], "Command: seed": [[0, "dbt-section"]], "Command: snapshot": [[0, "dbt-section"]], "Command: source": [[0, "dbt-section"]], "Command: test": [[0, "dbt-section"]]}, "indexentries": {}}) \ No newline at end of file +Search.setIndex({"docnames": ["index"], "filenames": ["index.rst"], "titles": ["dbt-core\u2019s API documentation"], "terms": {"right": 0, "now": 0, "best": 0, "wai": 0, "from": 0, "i": 0, "us": 0, "dbtrunner": 0, "we": 0, "expos": 0, "cli": 0, "main": 0, "import": 0, "cli_arg": 0, "project": 0, "dir": 0, "jaffle_shop": 0, "initi": 0, "runner": 0, "re": 0, "success": 0, "you": 0, "can": 0, "also": 0, "pass": 0, "pre": 0, "construct": 0, "object": 0, "those": 0, "instead": 0, "load": 0, "up": 0, "disk": 0, "preload": 0, "load_profil": 0, "postgr": 0, "load_project": 0, "fals": 0, "thi": 0, "For": 0, "full": 0, "exampl": 0, "code": 0, "refer": 0, "py": 0, "type": 0, "boolean": 0, "If": 0, "set": 0, "variabl": 0, "resolv": 0, "unselect": 0, "node": 0, "unknown": 0, "specifi": 0, "stop": 0, "execut": 0, "first": 0, "failur": 0, "drop": 0, "increment": 0, "fulli": 0, "recalcul": 0, "tabl": 0, "definit": 0, "choic": 0, "eager": 0, "cautiou": 0, "all": 0, "ar": 0, "adjac": 0, "resourc": 0, "even": 0, "thei": 0, "have": 0, "been": 0, "explicitli": 0, "string": 0, "which": 0, "overrid": 0, "dbt_project": 0, "yml": 0, "path": 0, "directori": 0, "look": 0, "file": 0, "current": 0, "work": 0, "home": 0, "default": 0, "its": 0, "parent": 0, "todo": 0, "No": 0, "help": 0, "text": 0, "includ": 0, "The": 0, "name": 0, "defin": 0, "sampl": 0, "data": 0, "termin": 0, "given": 0, "json": 0, "compar": 0, "store": 0, "result": 0, "fail": 0, "row": 0, "databas": 0, "configur": 0, "onli": 0, "appli": 0, "dbt_target_path": 0, "int": 0, "number": 0, "while": 0, "yaml": 0, "suppli": 0, "argument": 0, "your": 0, "should": 0, "eg": 0, "my_vari": 0, "my_valu": 0, "ensur": 0, "version": 0, "match": 0, "one": 0, "requir": 0, "avail": 0, "inform": 0, "skip": 0, "inter": 0, "setup": 0, "dictionari": 0, "map": 0, "keyword": 0}, "objects": {}, "objtypes": {}, "objnames": {}, "titleterms": {"dbt": 0, "core": 0, "": 0, "api": 0, "document": 0, "how": 0, "invok": 0, "command": 0, "python": 0, "runtim": 0, "build": 0, "defer": 0, "exclud": 0, "fail_fast": 0, "full_refresh": 0, "indirect_select": 0, "profil": 0, "profiles_dir": 0, "project_dir": 0, "resource_typ": 0, "select": 0, "selector": 0, "show": 0, "state": 0, "store_failur": 0, "target": 0, "target_path": 0, "thread": 0, "var": 0, "version_check": 0, "clean": 0, "compil": 0, "model": 0, "parse_onli": 0, "debug": 0, "config_dir": 0, "dep": 0, "doc": 0, "init": 0, "project_nam": 0, "skip_profile_setup": 0, "list": 0, "output": 0, "output_kei": 0, "pars": 0, "write_manifest": 0, "run": 0, "run_oper": 0, "macro": 0, "arg": 0, "seed": 0, "snapshot": 0, "sourc": 0, "test": 0}, "envversion": {"sphinx.domains.c": 2, "sphinx.domains.changeset": 1, "sphinx.domains.citation": 1, "sphinx.domains.cpp": 8, "sphinx.domains.index": 1, "sphinx.domains.javascript": 2, "sphinx.domains.math": 2, "sphinx.domains.python": 3, "sphinx.domains.rst": 2, "sphinx.domains.std": 2, "sphinx": 57}, "alltitles": {"dbt-core\u2019s API documentation": [[0, "dbt-core-s-api-documentation"]], "How to invoke dbt commands in python runtime": [[0, "how-to-invoke-dbt-commands-in-python-runtime"]], "API documentation": [[0, "api-documentation"]], "Command: build": [[0, "dbt-section"]], "defer": [[0, "build|defer"], [0, "compile|defer"], [0, "run|defer"], [0, "snapshot|defer"], [0, "test|defer"]], "exclude": [[0, "build|exclude"], [0, "compile|exclude"], [0, "list|exclude"], [0, "list|exclude"], [0, "run|exclude"], [0, "seed|exclude"], [0, "snapshot|exclude"], [0, "test|exclude"]], "fail_fast": [[0, "build|fail_fast"], [0, "run|fail_fast"], [0, "test|fail_fast"]], "full_refresh": [[0, "build|full_refresh"], [0, "compile|full_refresh"], [0, "run|full_refresh"], [0, "seed|full_refresh"]], "indirect_selection": [[0, "build|indirect_selection"], [0, "list|indirect_selection"], [0, "list|indirect_selection"], [0, "test|indirect_selection"]], "profile": [[0, "build|profile"], [0, "clean|profile"], [0, "compile|profile"], [0, "debug|profile"], [0, "deps|profile"], [0, "init|profile"], [0, "list|profile"], [0, "list|profile"], [0, "parse|profile"], [0, "run|profile"], [0, "run-operation|profile"], [0, "seed|profile"], [0, "snapshot|profile"], [0, "test|profile"]], "profiles_dir": [[0, "build|profiles_dir"], [0, "clean|profiles_dir"], [0, "compile|profiles_dir"], [0, "debug|profiles_dir"], [0, "deps|profiles_dir"], [0, "init|profiles_dir"], [0, "list|profiles_dir"], [0, "list|profiles_dir"], [0, "parse|profiles_dir"], [0, "run|profiles_dir"], [0, "run-operation|profiles_dir"], [0, "seed|profiles_dir"], [0, "snapshot|profiles_dir"], [0, "test|profiles_dir"]], "project_dir": [[0, "build|project_dir"], [0, "clean|project_dir"], [0, "compile|project_dir"], [0, "debug|project_dir"], [0, "deps|project_dir"], [0, "init|project_dir"], [0, "list|project_dir"], [0, "list|project_dir"], [0, "parse|project_dir"], [0, "run|project_dir"], [0, "run-operation|project_dir"], [0, "seed|project_dir"], [0, "snapshot|project_dir"], [0, "test|project_dir"]], "resource_types": [[0, "build|resource_types"], [0, "list|resource_types"], [0, "list|resource_types"]], "select": [[0, "build|select"], [0, "compile|select"], [0, "list|select"], [0, "list|select"], [0, "run|select"], [0, "seed|select"], [0, "snapshot|select"], [0, "test|select"]], "selector": [[0, "build|selector"], [0, "compile|selector"], [0, "list|selector"], [0, "list|selector"], [0, "run|selector"], [0, "seed|selector"], [0, "snapshot|selector"], [0, "test|selector"]], "show": [[0, "build|show"], [0, "seed|show"]], "state": [[0, "build|state"], [0, "compile|state"], [0, "list|state"], [0, "list|state"], [0, "run|state"], [0, "seed|state"], [0, "snapshot|state"], [0, "test|state"]], "store_failures": [[0, "build|store_failures"], [0, "test|store_failures"]], "target": [[0, "build|target"], [0, "clean|target"], [0, "compile|target"], [0, "debug|target"], [0, "deps|target"], [0, "init|target"], [0, "list|target"], [0, "list|target"], [0, "parse|target"], [0, "run|target"], [0, "run-operation|target"], [0, "seed|target"], [0, "snapshot|target"], [0, "test|target"]], "target_path": [[0, "build|target_path"], [0, "compile|target_path"], [0, "parse|target_path"], [0, "run|target_path"], [0, "seed|target_path"], [0, "test|target_path"]], "threads": [[0, "build|threads"], [0, "compile|threads"], [0, "parse|threads"], [0, "run|threads"], [0, "seed|threads"], [0, "snapshot|threads"], [0, "test|threads"]], "vars": [[0, "build|vars"], [0, "clean|vars"], [0, "compile|vars"], [0, "debug|vars"], [0, "deps|vars"], [0, "init|vars"], [0, "list|vars"], [0, "list|vars"], [0, "parse|vars"], [0, "run|vars"], [0, "run-operation|vars"], [0, "seed|vars"], [0, "snapshot|vars"], [0, "test|vars"]], "version_check": [[0, "build|version_check"], [0, "compile|version_check"], [0, "debug|version_check"], [0, "parse|version_check"], [0, "run|version_check"], [0, "seed|version_check"], [0, "test|version_check"]], "Command: clean": [[0, "dbt-section"]], "Command: compile": [[0, "dbt-section"]], "models": [[0, "compile|models"], [0, "list|models"], [0, "list|models"], [0, "run|models"], [0, "seed|models"], [0, "snapshot|models"], [0, "test|models"]], "parse_only": [[0, "compile|parse_only"]], "Command: debug": [[0, "dbt-section"]], "config_dir": [[0, "debug|config_dir"]], "Command: deps": [[0, "dbt-section"]], "Command: docs": [[0, "dbt-section"]], "Command: init": [[0, "dbt-section"]], "project_name": [[0, "init|project_name"]], "skip_profile_setup": [[0, "init|skip_profile_setup"]], "Command: list": [[0, "dbt-section"], [0, "dbt-section"]], "output": [[0, "list|output"], [0, "list|output"]], "output_keys": [[0, "list|output_keys"], [0, "list|output_keys"]], "Command: parse": [[0, "dbt-section"]], "compile": [[0, "parse|compile"]], "write_manifest": [[0, "parse|write_manifest"]], "Command: run": [[0, "dbt-section"]], "Command: run_operation": [[0, "dbt-section"]], "macro": [[0, "run-operation|macro"]], "args": [[0, "run-operation|args"]], "Command: seed": [[0, "dbt-section"]], "Command: snapshot": [[0, "dbt-section"]], "Command: source": [[0, "dbt-section"]], "Command: test": [[0, "dbt-section"]]}, "indexentries": {}}) \ No newline at end of file diff --git a/core/dbt/docs/source/_ext/dbt_click.py b/core/dbt/docs/source/_ext/dbt_click.py index 1431ca91e41..7343cc6a110 100644 --- a/core/dbt/docs/source/_ext/dbt_click.py +++ b/core/dbt/docs/source/_ext/dbt_click.py @@ -44,7 +44,9 @@ def format_params(cmd) -> t.List[nodes.section]: type_str = get_type_str(param.type) param_section.append(nodes.paragraph(text=f"Type: {type_str}")) - param_section.append(nodes.paragraph(text=param.help)) + help_txt = getattr(param, "help", None) + if help_txt is not None: + param_section.append(nodes.paragraph(text=help_txt)) lines.append(param_section) return lines diff --git a/core/dbt/docs/source/index.rst b/core/dbt/docs/source/index.rst index 93d34a648f2..dcd1c82499f 100644 --- a/core/dbt/docs/source/index.rst +++ b/core/dbt/docs/source/index.rst @@ -6,6 +6,7 @@ How to invoke dbt commands in python runtime Right now the best way to invoke a command from python runtime is to use the `dbtRunner` we exposed .. code-block:: python + from dbt.cli.main import dbtRunner cli_args = ['run', '--project-dir', 'jaffle_shop'] diff --git a/core/dbt/main.py b/core/dbt/main.py index 3c23cfec4b3..d651c073765 100644 --- a/core/dbt/main.py +++ b/core/dbt/main.py @@ -30,7 +30,6 @@ import dbt.task.generate as generate_task import dbt.task.init as init_task import dbt.task.list as list_task -import dbt.task.parse as parse_task import dbt.task.run as run_task import dbt.task.run_operation as run_operation_task import dbt.task.seed as seed_task @@ -541,7 +540,9 @@ def _build_parse_subparser(subparsers, base_subparser): Parses the project and provides information on performance """, ) - sub.set_defaults(cls=parse_task.ParseTask, which="parse", rpc_method="parse") + # NOTE: changing this cls to None is breaking, but this file should go + # away once merging the click work + sub.set_defaults(cls=None, which="parse", rpc_method="parse") sub.add_argument("--write-manifest", action="store_true") sub.add_argument("--compile", action="store_true") return sub diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 787b70cfeaf..48eb9fca9c5 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -7,9 +7,11 @@ from typing import Dict, Optional, Mapping, Callable, Any, List, Type, Union, Tuple from itertools import chain import time +import json import dbt.exceptions import dbt.tracking +import dbt.utils import dbt.flags as flags from dbt.adapters.factory import ( @@ -20,6 +22,7 @@ from dbt.helper_types import PathSet from dbt.events.functions import fire_event, get_invocation_id, warn_or_error from dbt.events.types import ( + ParseCmdPerfInfoPath, PartialParsingFullReparseBecauseOfError, PartialParsingExceptionFile, PartialParsingFile, @@ -44,7 +47,7 @@ from dbt.node_types import NodeType from dbt.clients.jinja import get_rendered, MacroStack from dbt.clients.jinja_static import statically_extract_macro_calls -from dbt.clients.system import make_directory +from dbt.clients.system import make_directory, write_file from dbt.config import Project, RuntimeConfig from dbt.context.docs import generate_runtime_docs_context from dbt.context.macro_resolver import MacroResolver, TestMacroNamespace @@ -89,8 +92,10 @@ from dbt.dataclass_schema import StrEnum, dbtClassMixin +MANIFEST_FILE_NAME = "manifest.json" PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack" PARSING_STATE = DbtProcessState("parsing") +PERF_INFO_FILE_NAME = "perf_info.json" class ReparseReason(StrEnum): @@ -193,6 +198,7 @@ def get_full_manifest( config: RuntimeConfig, *, reset: bool = False, + write_perf_info=False, ) -> Manifest: adapter = get_adapter(config) # type: ignore @@ -223,6 +229,9 @@ def get_full_manifest( loader._perf_info.load_all_elapsed = time.perf_counter() - start_load_all loader.track_project_load() + if write_perf_info: + loader.write_perf_info(config.target_path) + return manifest # This is where the main action happens @@ -954,6 +963,11 @@ def process_nodes(self): self.manifest.rebuild_ref_lookup() + def write_perf_info(self, target_path: str): + path = os.path.join(target_path, PERF_INFO_FILE_NAME) + write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4)) + fire_event(ParseCmdPerfInfoPath(path=path)) + def invalid_target_fail_unless_test( node, @@ -1378,3 +1392,8 @@ def process_node(config: RuntimeConfig, manifest: Manifest, node: ManifestNode): _process_refs_for_node(manifest, config.project_name, node) ctx = generate_runtime_docs_context(config, node, manifest, config.project_name) _process_docs_for_node(ctx, node) + + +def write_manifest(manifest: Manifest, target_path: str): + path = os.path.join(target_path, MANIFEST_FILE_NAME) + manifest.write(path) diff --git a/core/dbt/task/base.py b/core/dbt/task/base.py index f6d37937e99..1a948b48b2e 100644 --- a/core/dbt/task/base.py +++ b/core/dbt/task/base.py @@ -45,10 +45,11 @@ from dbt.events.contextvars import get_node_info from .printer import print_run_result_error -from dbt.adapters.factory import register_adapter +from dbt.adapters.factory import get_adapter from dbt.config import RuntimeConfig, Project from dbt.config.profile import read_profile import dbt.exceptions +from dbt.graph import Graph class NoneConfig: @@ -96,7 +97,7 @@ def set_log_format(cls): log_manager.format_text() @classmethod - def from_args(cls, args): + def from_args(cls, args, *pargs, **kwargs): try: # This is usually RuntimeConfig config = cls.ConfigType.from_args(args) @@ -123,7 +124,7 @@ def from_args(cls, args): tracking.track_invalid_invocation(args=args, result_type=exc.result_type) raise dbt.exceptions.RuntimeException("Could not run dbt") from exc - return cls(args, config) + return cls(args, config, *pargs, **kwargs) @abstractmethod def run(self): @@ -167,17 +168,36 @@ def move_to_nearest_project_dir(project_dir: Optional[str]) -> str: return nearest_project_dir +# TODO: look into deprecating this class in favor of several small functions that +# produce the same behavior. currently this class only contains manifest compilation, +# holding a manifest, and moving direcories. class ConfiguredTask(BaseTask): ConfigType = RuntimeConfig - def __init__(self, args, config): + def __init__(self, args, config, manifest: Optional[Manifest] = None): super().__init__(args, config) - register_adapter(self.config) + self.graph: Optional[Graph] = None + self.manifest = manifest + + def compile_manifest(self): + if self.manifest is None: + raise InternalException("compile_manifest called before manifest was loaded") + + start_compile_manifest = time.perf_counter() + + # we cannot get adapter in init since it will break rpc #5579 + adapter = get_adapter(self.config) + compiler = adapter.get_compiler() + self.graph = compiler.compile(self.manifest) + + compile_time = time.perf_counter() - start_compile_manifest + if dbt.tracking.active_user is not None: + dbt.tracking.track_runnable_timing({"graph_compilation_elapsed": compile_time}) @classmethod - def from_args(cls, args): + def from_args(cls, args, *pargs, **kwargs): move_to_nearest_project_dir(args.project_dir) - return super().from_args(args) + return super().from_args(args, *pargs, **kwargs) class ExecutionContext: diff --git a/core/dbt/task/compile.py b/core/dbt/task/compile.py index 740d35d37e9..92eedcd97d3 100644 --- a/core/dbt/task/compile.py +++ b/core/dbt/task/compile.py @@ -10,6 +10,7 @@ from dbt.graph import ResourceTypeSelector from dbt.events.functions import fire_event from dbt.events.types import CompileComplete +from dbt.parser.manifest import write_manifest from dbt.node_types import NodeType @@ -85,4 +86,4 @@ def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]): selected=selected_uids, ) # TODO: is it wrong to write the manifest here? I think it's right... - self.write_manifest() + write_manifest(self.manifest, self.config.target_path) diff --git a/core/dbt/task/generate.py b/core/dbt/task/generate.py index 87723a530a1..119a32acf42 100644 --- a/core/dbt/task/generate.py +++ b/core/dbt/task/generate.py @@ -31,7 +31,7 @@ CannotGenerateDocs, BuildingCatalog, ) -from dbt.parser.manifest import ManifestLoader +from dbt.parser.manifest import write_manifest import dbt.utils import dbt.compilation import dbt.exceptions @@ -199,11 +199,6 @@ def get_unique_id_mapping( class GenerateTask(CompileTask): - def _get_manifest(self) -> Manifest: - if self.manifest is None: - raise InternalException("manifest should not be None in _get_manifest") - return self.manifest - def run(self) -> CatalogArtifact: compile_results = None if self.args.compile: @@ -217,8 +212,6 @@ def run(self) -> CatalogArtifact: errors=None, compile_results=compile_results, ) - else: - self.manifest = ManifestLoader.get_full_manifest(self.config) shutil.copyfile(DOCS_INDEX_FILE_PATH, os.path.join(self.config.target_path, "index.html")) @@ -262,7 +255,7 @@ def run(self) -> CatalogArtifact: path = os.path.join(self.config.target_path, CATALOG_FILENAME) results.write(path) if self.args.compile: - self.write_manifest() + write_manifest(self.manifest, self.config.target_path) if exceptions: fire_event(WriteCatalogFailure(num_exceptions=len(exceptions))) diff --git a/core/dbt/task/list.py b/core/dbt/task/list.py index e1be8f214d3..a0b549f620f 100644 --- a/core/dbt/task/list.py +++ b/core/dbt/task/list.py @@ -2,7 +2,7 @@ from dbt.contracts.graph.nodes import Exposure, SourceDefinition, Metric from dbt.graph import ResourceTypeSelector -from dbt.task.runnable import GraphRunnableTask, ManifestTask +from dbt.task.runnable import GraphRunnableTask from dbt.task.test import TestSelector from dbt.node_types import NodeType from dbt.events.functions import warn_or_error @@ -40,8 +40,8 @@ class ListTask(GraphRunnableTask): ) ) - def __init__(self, args, config): - super().__init__(args, config) + def __init__(self, args, config, manifest): + super().__init__(args, config, manifest) if self.args.models: if self.args.select: raise RuntimeException('"models" and "select" are mutually exclusive arguments') @@ -132,7 +132,7 @@ def generate_paths(self): yield node.original_file_path def run(self): - ManifestTask._runtime_initialize(self) + self.compile_manifest() output = self.args.output if output == "selector": generator = self.generate_selectors diff --git a/core/dbt/task/parse.py b/core/dbt/task/parse.py deleted file mode 100644 index 5460bf0f3d0..00000000000 --- a/core/dbt/task/parse.py +++ /dev/null @@ -1,102 +0,0 @@ -# This task is intended to be used for diagnosis, development and -# performance analysis. -# It separates out the parsing flows for easier logging and -# debugging. -# To store cProfile performance data, execute with the '-r' -# flag and an output file: dbt -r dbt.cprof parse. -# Use a visualizer such as snakeviz to look at the output: -# snakeviz dbt.cprof -from dbt.task.base import ConfiguredTask -from dbt.adapters.factory import get_adapter -from dbt.parser.manifest import Manifest, ManifestLoader, _check_manifest -from dbt.logger import DbtProcessState -from dbt.clients.system import write_file -from dbt.events.types import ( - ManifestDependenciesLoaded, - ManifestLoaderCreated, - ManifestLoaded, - ManifestChecked, - ManifestFlatGraphBuilt, - ParseCmdStart, - ParseCmdCompiling, - ParseCmdWritingManifest, - ParseCmdDone, - ParseCmdPerfInfoPath, -) -from dbt.events.functions import fire_event -from dbt.graph import Graph -import time -from typing import Optional -import os -import json -import dbt.utils - -MANIFEST_FILE_NAME = "manifest.json" -PERF_INFO_FILE_NAME = "perf_info.json" -PARSING_STATE = DbtProcessState("parsing") - - -class ParseTask(ConfiguredTask): - def __init__(self, args, config): - super().__init__(args, config) - self.manifest: Optional[Manifest] = None - self.graph: Optional[Graph] = None - self.loader: Optional[ManifestLoader] = None - - def write_manifest(self): - path = os.path.join(self.config.target_path, MANIFEST_FILE_NAME) - self.manifest.write(path) - - def write_perf_info(self): - path = os.path.join(self.config.target_path, PERF_INFO_FILE_NAME) - write_file(path, json.dumps(self.loader._perf_info, cls=dbt.utils.JSONEncoder, indent=4)) - fire_event(ParseCmdPerfInfoPath(path=path)) - - # This method takes code that normally exists in other files - # and pulls it in here, to simplify logging and make the - # parsing flow-of-control easier to understand and manage, - # with the downside that if changes happen in those other methods, - # similar changes might need to be made here. - # ManifestLoader.get_full_manifest - # ManifestLoader.load - # ManifestLoader.load_all - - def get_full_manifest(self): - adapter = get_adapter(self.config) # type: ignore - root_config = self.config - macro_hook = adapter.connections.set_query_header - with PARSING_STATE: - start_load_all = time.perf_counter() - projects = root_config.load_dependencies() - fire_event(ManifestDependenciesLoaded()) - loader = ManifestLoader(root_config, projects, macro_hook) - fire_event(ManifestLoaderCreated()) - manifest = loader.load() - fire_event(ManifestLoaded()) - _check_manifest(manifest, root_config) - fire_event(ManifestChecked()) - manifest.build_flat_graph() - fire_event(ManifestFlatGraphBuilt()) - loader._perf_info.load_all_elapsed = time.perf_counter() - start_load_all - - self.loader = loader - self.manifest = manifest - fire_event(ManifestLoaded()) - - def compile_manifest(self): - adapter = get_adapter(self.config) - compiler = adapter.get_compiler() - self.graph = compiler.compile(self.manifest) - - def run(self): - fire_event(ParseCmdStart()) - self.get_full_manifest() - if self.args.compile: - fire_event(ParseCmdCompiling()) - self.compile_manifest() - if self.args.write_manifest: - fire_event(ParseCmdWritingManifest()) - self.write_manifest() - - self.write_perf_info() - fire_event(ParseCmdDone()) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index bc8f9a2de75..f21dfd570e9 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -284,8 +284,8 @@ def execute(self, model, manifest): class RunTask(CompileTask): - def __init__(self, args, config): - super().__init__(args, config) + def __init__(self, args, config, manifest): + super().__init__(args, config, manifest) self.ran_hooks = [] self._total_executed = 0 diff --git a/core/dbt/task/run_operation.py b/core/dbt/task/run_operation.py index 9d7a469efd8..70bf39042f7 100644 --- a/core/dbt/task/run_operation.py +++ b/core/dbt/task/run_operation.py @@ -3,12 +3,11 @@ import agate -from .runnable import ManifestTask +from .base import ConfiguredTask import dbt.exceptions from dbt.adapters.factory import get_adapter from dbt.contracts.results import RunOperationResultsArtifact -from dbt.exceptions import InternalException from dbt.events.functions import fire_event from dbt.events.types import ( RunningOperationCaughtError, @@ -17,7 +16,7 @@ ) -class RunOperationTask(ManifestTask): +class RunOperationTask(ConfiguredTask): def _get_macro_parts(self): macro_name = self.args.macro if "." in macro_name: @@ -27,10 +26,6 @@ def _get_macro_parts(self): return package_name, macro_name - def compile_manifest(self) -> None: - if self.manifest is None: - raise InternalException("manifest was None in compile_manifest") - def _run_unsafe(self) -> agate.Table: adapter = get_adapter(self.config) @@ -47,7 +42,7 @@ def _run_unsafe(self) -> agate.Table: def run(self) -> RunOperationResultsArtifact: start = datetime.utcnow() - self._runtime_initialize() + self.compile_manifest() try: self._run_unsafe() except dbt.exceptions.Exception as exc: diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index fa8fdb724a8..e86cdbd2973 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -1,6 +1,5 @@ import os import time -import json from pathlib import Path from abc import abstractmethod from concurrent.futures import as_completed @@ -13,7 +12,6 @@ print_run_end_messages, ) -from dbt.clients.system import write_file from dbt.task.base import ConfiguredTask from dbt.adapters.base import BaseRelation from dbt.adapters.factory import get_adapter @@ -39,7 +37,6 @@ NothingToDo, ) from dbt.events.contextvars import log_contextvars -from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.nodes import SourceDefinition, ResultNode from dbt.contracts.results import NodeStatus, RunExecutionResult, RunningStatus from dbt.contracts.state import PreviousState @@ -50,8 +47,8 @@ FailFastException, ) -from dbt.graph import GraphQueue, NodeSelector, SelectionSpec, parse_difference, Graph -from dbt.parser.manifest import ManifestLoader +from dbt.graph import GraphQueue, NodeSelector, SelectionSpec, parse_difference +from dbt.parser.manifest import write_manifest import dbt.tracking import dbt.exceptions @@ -59,53 +56,15 @@ import dbt.utils RESULT_FILE_NAME = "run_results.json" -MANIFEST_FILE_NAME = "manifest.json" RUNNING_STATE = DbtProcessState("running") -class ManifestTask(ConfiguredTask): - def __init__(self, args, config): - super().__init__(args, config) - self.manifest: Optional[Manifest] = None - self.graph: Optional[Graph] = None - - def write_manifest(self): - if flags.WRITE_JSON: - path = os.path.join(self.config.target_path, MANIFEST_FILE_NAME) - self.manifest.write(path) - if os.getenv("DBT_WRITE_FILES"): - path = os.path.join(self.config.target_path, "files.json") - write_file(path, json.dumps(self.manifest.files, cls=dbt.utils.JSONEncoder, indent=4)) - - def load_manifest(self): - self.manifest = ManifestLoader.get_full_manifest(self.config) - self.write_manifest() - - def compile_manifest(self): - if self.manifest is None: - raise InternalException("compile_manifest called before manifest was loaded") - - # we cannot get adapter in init since it will break rpc #5579 - adapter = get_adapter(self.config) - compiler = adapter.get_compiler() - self.graph = compiler.compile(self.manifest) - - def _runtime_initialize(self): - self.load_manifest() - - start_compile_manifest = time.perf_counter() - self.compile_manifest() - compile_time = time.perf_counter() - start_compile_manifest - if dbt.tracking.active_user is not None: - dbt.tracking.track_runnable_timing({"graph_compilation_elapsed": compile_time}) - - -class GraphRunnableTask(ManifestTask): +class GraphRunnableTask(ConfiguredTask): MARK_DEPENDENT_ERRORS_STATUSES = [NodeStatus.Error] - def __init__(self, args, config): - super().__init__(args, config) + def __init__(self, args, config, manifest): + super().__init__(args, config, manifest) self.job_queue: Optional[GraphQueue] = None self._flattened_nodes: Optional[List[ResultNode]] = None @@ -165,9 +124,9 @@ def get_graph_queue(self) -> GraphQueue: return selector.get_graph_queue(spec) def _runtime_initialize(self): - super()._runtime_initialize() + self.compile_manifest() if self.manifest is None or self.graph is None: - raise InternalException("_runtime_initialize never loaded the manifest and graph!") + raise InternalException("_runtime_initialize never loaded the graph!") self.job_queue = self.get_graph_queue() @@ -490,7 +449,7 @@ def run(self): ) if flags.WRITE_JSON: - self.write_manifest() + write_manifest(self.manifest, self.config.target_path) self.write_result(result) self.task_end_messages(result.results) diff --git a/test/unit/test_config.py b/test/unit/test_config.py index 456f16fade6..d45ee86587d 100644 --- a/test/unit/test_config.py +++ b/test/unit/test_config.py @@ -12,15 +12,15 @@ import dbt.config import dbt.exceptions +import dbt.tracking from dbt import flags from dbt.adapters.factory import load_plugin from dbt.adapters.postgres import PostgresCredentials -from dbt.context.base import generate_base_context from dbt.contracts.connection import QueryComment, DEFAULT_QUERY_COMMENT from dbt.contracts.project import PackageConfig, LocalPackage, GitPackage from dbt.node_types import NodeType from dbt.semver import VersionSpecifier -from dbt.task.run_operation import RunOperationTask +from dbt.task.base import ConfiguredTask from .utils import normalize @@ -909,7 +909,12 @@ def test_with_invalid_package(self): dbt.config.Project.from_project_root(self.project_dir, renderer) -class TestRunOperationTask(BaseFileTest): +class InheritsFromConfiguredTask(ConfiguredTask): + def run(self): + pass + + +class TestConfiguredTask(BaseFileTest): def setUp(self): super().setUp() self.write_project(self.default_project_data) @@ -921,17 +926,17 @@ def tearDown(self): # so it's necessary to change it back at the end. os.chdir(INITIAL_ROOT) - def test_run_operation_task(self): + def test_configured_task_dir_change(self): self.assertEqual(os.getcwd(), INITIAL_ROOT) self.assertNotEqual(INITIAL_ROOT, self.project_dir) - new_task = RunOperationTask.from_args(self.args) + new_task = InheritsFromConfiguredTask.from_args(self.args) self.assertEqual(os.path.realpath(os.getcwd()), os.path.realpath(self.project_dir)) - def test_run_operation_task_with_bad_path(self): + def test_configured_task_dir_change_with_bad_path(self): self.args.project_dir = 'bad_path' with self.assertRaises(dbt.exceptions.RuntimeException): - new_task = RunOperationTask.from_args(self.args) + new_task = InheritsFromConfiguredTask.from_args(self.args) class TestVariableProjectFile(BaseFileTest):