diff --git a/.changes/unreleased/Under the Hood-20230110-115725.yaml b/.changes/unreleased/Under the Hood-20230110-115725.yaml new file mode 100644 index 00000000000..81dfefdba91 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20230110-115725.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Abstract manifest generation +time: 2023-01-10T11:57:25.193965-06:00 +custom: + Author: stu-k + Issue: "6357" diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index 57dc01f4aa6..1adc432daf2 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -5,7 +5,6 @@ import click from dbt.cli import requires, params as p -from dbt.config import RuntimeConfig from dbt.config.project import Project from dbt.config.profile import Profile from dbt.contracts.graph.manifest import Manifest @@ -120,10 +119,15 @@ def cli(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def build(ctx, **kwargs): """Run all Seeds, Models, Snapshots, and tests in DAG order""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = BuildTask(ctx.obj["flags"], config) + task = BuildTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -166,7 +170,6 @@ def docs(ctx, **kwargs): @p.models @p.profile @p.profiles_dir -@p.project_dir @p.select @p.selector @p.state @@ -178,10 +181,11 @@ def docs(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def docs_generate(ctx, **kwargs): """Generate the documentation website for your project""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = GenerateTask(ctx.obj["flags"], config) + task = GenerateTask(ctx.obj["flags"], ctx.obj["runtime_config"]) results = task.run() success = task.interpret_results(results) @@ -225,11 +229,18 @@ def docs_serve(ctx, **kwargs): @p.vars @p.version_check @requires.preflight +@requires.profile +@requires.project +@requires.runtime_config +@requires.manifest def compile(ctx, **kwargs): """Generates executable SQL from source, model, test, and analysis files. Compiled SQL files are written to the target/ directory.""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = CompileTask(ctx.obj["flags"], config) + task = CompileTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -267,7 +278,6 @@ def debug(ctx, **kwargs): def deps(ctx, **kwargs): """Pull the most recent version of the dependencies listed in packages.yml""" task = DepsTask(ctx.obj["flags"], ctx.obj["project"]) - results = task.run() success = task.interpret_results(results) return results, success @@ -311,10 +321,15 @@ def init(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def list(ctx, **kwargs): """List the resources in your project""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = ListTask(ctx.obj["flags"], config) + task = ListTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -341,9 +356,13 @@ def list(ctx, **kwargs): @p.version_check @p.write_manifest @requires.preflight +@requires.profile +@requires.project +@requires.runtime_config +@requires.manifest(write_perf_info=True) def parse(ctx, **kwargs): """Parses the project and provides information on performance""" - click.echo(f"`{inspect.stack()[0][3]}` called\n flags: {ctx.obj['flags']}") + # manifest generation and writing happens in @requires.manifest return None, True @@ -369,10 +388,15 @@ def parse(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def run(ctx, **kwargs): """Compile SQL and execute against the current target database.""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = RunTask(ctx.obj["flags"], config) + task = RunTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -392,10 +416,15 @@ def run(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def run_operation(ctx, **kwargs): """Run the named macro with any supplied arguments.""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = RunOperationTask(ctx.obj["flags"], config) + task = RunOperationTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -423,10 +452,15 @@ def run_operation(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def seed(ctx, **kwargs): """Load data from csv files into your data warehouse.""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = SeedTask(ctx.obj["flags"], config) + task = SeedTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -451,10 +485,15 @@ def seed(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def snapshot(ctx, **kwargs): """Execute snapshots defined in your project""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = SnapshotTask(ctx.obj["flags"], config) + task = SnapshotTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -486,10 +525,15 @@ def source(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def freshness(ctx, **kwargs): """check the current freshness of the project's sources""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = FreshnessTask(ctx.obj["flags"], config) + task = FreshnessTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) @@ -525,10 +569,15 @@ def freshness(ctx, **kwargs): @requires.preflight @requires.profile @requires.project +@requires.runtime_config +@requires.manifest def test(ctx, **kwargs): """Runs tests on data in deployed models. Run this after `dbt run`""" - config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"]) - task = TestTask(ctx.obj["flags"], config) + task = TestTask( + ctx.obj["flags"], + ctx.obj["runtime_config"], + ctx.obj["manifest"], + ) results = task.run() success = task.interpret_results(results) diff --git a/core/dbt/cli/requires.py b/core/dbt/cli/requires.py index 690c12bfa10..74ec80c986a 100644 --- a/core/dbt/cli/requires.py +++ b/core/dbt/cli/requires.py @@ -1,8 +1,10 @@ -from dbt.adapters.factory import adapter_management +from dbt.adapters.factory import adapter_management, register_adapter from dbt.cli.flags import Flags +from dbt.config import RuntimeConfig from dbt.config.runtime import load_project, load_profile from dbt.events.functions import setup_event_logger from dbt.exceptions import DbtProjectError +from dbt.parser.manifest import ManifestLoader, write_manifest from dbt.profiler import profiler from dbt.tracking import initialize_from_flags, track_run @@ -85,3 +87,70 @@ def wrapper(*args, **kwargs): return func(*args, **kwargs) return update_wrapper(wrapper, func) + + +def runtime_config(func): + """A decorator used by click command functions for generating a runtime + config given a profile and project. + """ + + def wrapper(*args, **kwargs): + ctx = args[0] + assert isinstance(ctx, Context) + + req_strs = ["profile", "project"] + reqs = [ctx.obj.get(req_str) for req_str in req_strs] + + if None in reqs: + raise DbtProjectError("profile and project required for runtime_config") + + ctx.obj["runtime_config"] = RuntimeConfig.from_parts( + ctx.obj["project"], + ctx.obj["profile"], + ctx.obj["flags"], + ) + + return func(*args, **kwargs) + + return update_wrapper(wrapper, func) + + +def manifest(*args0, write_perf_info=False): + """A decorator used by click command functions for generating a manifest + given a profile, project, and runtime config. This also registers the adaper + from the runtime config and writes the manifest to disc. + """ + + def outer_wrapper(func): + def wrapper(*args, **kwargs): + ctx = args[0] + assert isinstance(ctx, Context) + + req_strs = ["profile", "project", "runtime_config"] + reqs = [ctx.obj.get(dep) for dep in req_strs] + + if None in reqs: + raise DbtProjectError("profile, project, and runtime_config required for manifest") + + runtime_config = ctx.obj["runtime_config"] + register_adapter(runtime_config) + + # a manifest has already been set on the context, so don't overwrite it + if ctx.obj.get("manifest") is None: + manifest = ManifestLoader.get_full_manifest( + runtime_config, write_perf_info=write_perf_info + ) + + ctx.obj["manifest"] = manifest + if ctx.obj["flags"].write_json: + write_manifest(manifest, ctx.obj["runtime_config"].target_path) + + return func(*args, **kwargs) + + return update_wrapper(wrapper, func) + + # if there are no args, the decorator was used without params @decorator + # otherwise, the decorator was called with params @decorator(arg) + if len(args0) == 0: + return outer_wrapper + return outer_wrapper(args0[0]) diff --git a/core/dbt/docs/build/doctrees/environment.pickle b/core/dbt/docs/build/doctrees/environment.pickle index a9c2d07f929..3c70ad3fe4e 100644 Binary files a/core/dbt/docs/build/doctrees/environment.pickle and b/core/dbt/docs/build/doctrees/environment.pickle differ diff --git a/core/dbt/docs/build/doctrees/index.doctree b/core/dbt/docs/build/doctrees/index.doctree index 19135572dd5..45098290ce9 100644 Binary files a/core/dbt/docs/build/doctrees/index.doctree and b/core/dbt/docs/build/doctrees/index.doctree differ diff --git a/core/dbt/docs/build/html/_sources/index.rst.txt b/core/dbt/docs/build/html/_sources/index.rst.txt index 93d34a648f2..dcd1c82499f 100644 --- a/core/dbt/docs/build/html/_sources/index.rst.txt +++ b/core/dbt/docs/build/html/_sources/index.rst.txt @@ -6,6 +6,7 @@ How to invoke dbt commands in python runtime Right now the best way to invoke a command from python runtime is to use the `dbtRunner` we exposed .. code-block:: python + from dbt.cli.main import dbtRunner cli_args = ['run', '--project-dir', 'jaffle_shop'] diff --git a/core/dbt/docs/build/html/index.html b/core/dbt/docs/build/html/index.html index d0fd61a3227..a8b024ba555 100644 --- a/core/dbt/docs/build/html/index.html +++ b/core/dbt/docs/build/html/index.html @@ -35,6 +35,15 @@
Right now the best way to invoke a command from python runtime is to use the dbtRunner we exposed
+from dbt.cli.main import dbtRunner
+cli_args = ['run', '--project-dir', 'jaffle_shop']
+
+# initialize the dbt runner
+dbt = dbtRunner()
+# run the command
+res, success = dbt.invoke(args)
+
You can also pass in pre constructed object into dbtRunner, and we will use those objects instead of loading up from the disk.
# preload profile and project
profile = load_profile(project_dir, {}, 'testing-postgres')
@@ -92,6 +101,11 @@ project_dir
+resource_types¶
+Type: unknown
+TODO: No current help text
+
select¶
Type: unknown
@@ -313,6 +327,10 @@ vars¶
Command: init¶
+
+project_name¶
+Type: string
+
profile¶
Type: string
@@ -618,6 +636,10 @@ version_check¶
+
+macro¶
+Type: string
+
args¶
Type: YAML
diff --git a/core/dbt/docs/build/html/searchindex.js b/core/dbt/docs/build/html/searchindex.js
index 3ed297346d9..36036732601 100644
--- a/core/dbt/docs/build/html/searchindex.js
+++ b/core/dbt/docs/build/html/searchindex.js
@@ -1 +1 @@
-Search.setIndex({"docnames": ["index"], "filenames": ["index.rst"], "titles": ["dbt-core\u2019s API documentation"], "terms": {"right": 0, "now": 0, "best": 0, "wai": 0, "from": 0, "i": 0, "us": 0, "dbtrunner": 0, "we": 0, "expos": 0, "you": 0, "can": 0, "also": 0, "pass": 0, "pre": 0, "construct": 0, "object": 0, "those": 0, "instead": 0, "load": 0, "up": 0, "disk": 0, "preload": 0, "project": 0, "load_profil": 0, "postgr": 0, "load_project": 0, "fals": 0, "initi": 0, "runner": 0, "thi": 0, "re": 0, "success": 0, "cli_arg": 0, "For": 0, "full": 0, "exampl": 0, "code": 0, "refer": 0, "cli": 0, "py": 0, "type": 0, "boolean": 0, "If": 0, "set": 0, "variabl": 0, "resolv": 0, "unselect": 0, "node": 0, "unknown": 0, "specifi": 0, "stop": 0, "execut": 0, "first": 0, "failur": 0, "drop": 0, "increment": 0, "fulli": 0, "recalcul": 0, "tabl": 0, "definit": 0, "choic": 0, "eager": 0, "cautiou": 0, "all": 0, "ar": 0, "adjac": 0, "resourc": 0, "even": 0, "thei": 0, "have": 0, "been": 0, "explicitli": 0, "string": 0, "which": 0, "overrid": 0, "dbt_project": 0, "yml": 0, "path": 0, "directori": 0, "look": 0, "file": 0, "current": 0, "work": 0, "home": 0, "default": 0, "its": 0, "parent": 0, "includ": 0, "The": 0, "name": 0, "defin": 0, "sampl": 0, "data": 0, "termin": 0, "given": 0, "json": 0, "compar": 0, "store": 0, "result": 0, "fail": 0, "row": 0, "databas": 0, "configur": 0, "onli": 0, "appli": 0, "dbt_target_path": 0, "int": 0, "number": 0, "while": 0, "yaml": 0, "suppli": 0, "argument": 0, "your": 0, "should": 0, "eg": 0, "my_vari": 0, "my_valu": 0, "ensur": 0, "version": 0, "match": 0, "one": 0, "requir": 0, "todo": 0, "No": 0, "help": 0, "text": 0, "avail": 0, "inform": 0, "skip": 0, "inter": 0, "setup": 0, "macro": 0, "dictionari": 0, "map": 0, "keyword": 0}, "objects": {}, "objtypes": {}, "objnames": {}, "titleterms": {"dbt": 0, "core": 0, "": 0, "api": 0, "document": 0, "how": 0, "invok": 0, "command": 0, "python": 0, "runtim": 0, "build": 0, "defer": 0, "exclud": 0, "fail_fast": 0, "full_refresh": 0, "indirect_select": 0, "profil": 0, "profiles_dir": 0, "project_dir": 0, "select": 0, "selector": 0, "show": 0, "state": 0, "store_failur": 0, "target": 0, "target_path": 0, "thread": 0, "var": 0, "version_check": 0, "clean": 0, "compil": 0, "model": 0, "parse_onli": 0, "debug": 0, "config_dir": 0, "dep": 0, "doc": 0, "init": 0, "skip_profile_setup": 0, "list": 0, "output": 0, "output_kei": 0, "resource_typ": 0, "pars": 0, "write_manifest": 0, "run": 0, "run_oper": 0, "arg": 0, "seed": 0, "snapshot": 0, "sourc": 0, "test": 0}, "envversion": {"sphinx.domains.c": 2, "sphinx.domains.changeset": 1, "sphinx.domains.citation": 1, "sphinx.domains.cpp": 8, "sphinx.domains.index": 1, "sphinx.domains.javascript": 2, "sphinx.domains.math": 2, "sphinx.domains.python": 3, "sphinx.domains.rst": 2, "sphinx.domains.std": 2, "sphinx": 57}, "alltitles": {"dbt-core\u2019s API documentation": [[0, "dbt-core-s-api-documentation"]], "How to invoke dbt commands in python runtime": [[0, "how-to-invoke-dbt-commands-in-python-runtime"]], "API documentation": [[0, "api-documentation"]], "Command: build": [[0, "dbt-section"]], "defer": [[0, "build|defer"], [0, "compile|defer"], [0, "run|defer"], [0, "snapshot|defer"], [0, "test|defer"]], "exclude": [[0, "build|exclude"], [0, "compile|exclude"], [0, "list|exclude"], [0, "list|exclude"], [0, "run|exclude"], [0, "seed|exclude"], [0, "snapshot|exclude"], [0, "test|exclude"]], "fail_fast": [[0, "build|fail_fast"], [0, "run|fail_fast"], [0, "test|fail_fast"]], "full_refresh": [[0, "build|full_refresh"], [0, "compile|full_refresh"], [0, "run|full_refresh"], [0, "seed|full_refresh"]], "indirect_selection": [[0, "build|indirect_selection"], [0, "list|indirect_selection"], [0, "list|indirect_selection"], [0, "test|indirect_selection"]], "profile": [[0, "build|profile"], [0, "clean|profile"], [0, "compile|profile"], [0, "debug|profile"], [0, "deps|profile"], [0, "init|profile"], [0, "list|profile"], [0, "list|profile"], [0, "parse|profile"], [0, "run|profile"], [0, "run-operation|profile"], [0, "seed|profile"], [0, "snapshot|profile"], [0, "test|profile"]], "profiles_dir": [[0, "build|profiles_dir"], [0, "clean|profiles_dir"], [0, "compile|profiles_dir"], [0, "debug|profiles_dir"], [0, "deps|profiles_dir"], [0, "init|profiles_dir"], [0, "list|profiles_dir"], [0, "list|profiles_dir"], [0, "parse|profiles_dir"], [0, "run|profiles_dir"], [0, "run-operation|profiles_dir"], [0, "seed|profiles_dir"], [0, "snapshot|profiles_dir"], [0, "test|profiles_dir"]], "project_dir": [[0, "build|project_dir"], [0, "clean|project_dir"], [0, "compile|project_dir"], [0, "debug|project_dir"], [0, "deps|project_dir"], [0, "init|project_dir"], [0, "list|project_dir"], [0, "list|project_dir"], [0, "parse|project_dir"], [0, "run|project_dir"], [0, "run-operation|project_dir"], [0, "seed|project_dir"], [0, "snapshot|project_dir"], [0, "test|project_dir"]], "select": [[0, "build|select"], [0, "compile|select"], [0, "list|select"], [0, "list|select"], [0, "run|select"], [0, "seed|select"], [0, "snapshot|select"], [0, "test|select"]], "selector": [[0, "build|selector"], [0, "compile|selector"], [0, "list|selector"], [0, "list|selector"], [0, "run|selector"], [0, "seed|selector"], [0, "snapshot|selector"], [0, "test|selector"]], "show": [[0, "build|show"], [0, "seed|show"]], "state": [[0, "build|state"], [0, "compile|state"], [0, "list|state"], [0, "list|state"], [0, "run|state"], [0, "seed|state"], [0, "snapshot|state"], [0, "test|state"]], "store_failures": [[0, "build|store_failures"], [0, "test|store_failures"]], "target": [[0, "build|target"], [0, "clean|target"], [0, "compile|target"], [0, "debug|target"], [0, "deps|target"], [0, "init|target"], [0, "list|target"], [0, "list|target"], [0, "parse|target"], [0, "run|target"], [0, "run-operation|target"], [0, "seed|target"], [0, "snapshot|target"], [0, "test|target"]], "target_path": [[0, "build|target_path"], [0, "compile|target_path"], [0, "parse|target_path"], [0, "run|target_path"], [0, "seed|target_path"], [0, "test|target_path"]], "threads": [[0, "build|threads"], [0, "compile|threads"], [0, "parse|threads"], [0, "run|threads"], [0, "seed|threads"], [0, "snapshot|threads"], [0, "test|threads"]], "vars": [[0, "build|vars"], [0, "clean|vars"], [0, "compile|vars"], [0, "debug|vars"], [0, "deps|vars"], [0, "init|vars"], [0, "list|vars"], [0, "list|vars"], [0, "parse|vars"], [0, "run|vars"], [0, "run-operation|vars"], [0, "seed|vars"], [0, "snapshot|vars"], [0, "test|vars"]], "version_check": [[0, "build|version_check"], [0, "compile|version_check"], [0, "debug|version_check"], [0, "parse|version_check"], [0, "run|version_check"], [0, "seed|version_check"], [0, "test|version_check"]], "Command: clean": [[0, "dbt-section"]], "Command: compile": [[0, "dbt-section"]], "models": [[0, "compile|models"], [0, "list|models"], [0, "list|models"], [0, "run|models"], [0, "seed|models"], [0, "snapshot|models"], [0, "test|models"]], "parse_only": [[0, "compile|parse_only"]], "Command: debug": [[0, "dbt-section"]], "config_dir": [[0, "debug|config_dir"]], "Command: deps": [[0, "dbt-section"]], "Command: docs": [[0, "dbt-section"]], "Command: init": [[0, "dbt-section"]], "skip_profile_setup": [[0, "init|skip_profile_setup"]], "Command: list": [[0, "dbt-section"], [0, "dbt-section"]], "output": [[0, "list|output"], [0, "list|output"]], "output_keys": [[0, "list|output_keys"], [0, "list|output_keys"]], "resource_types": [[0, "list|resource_types"], [0, "list|resource_types"]], "Command: parse": [[0, "dbt-section"]], "compile": [[0, "parse|compile"]], "write_manifest": [[0, "parse|write_manifest"]], "Command: run": [[0, "dbt-section"]], "Command: run_operation": [[0, "dbt-section"]], "args": [[0, "run-operation|args"]], "Command: seed": [[0, "dbt-section"]], "Command: snapshot": [[0, "dbt-section"]], "Command: source": [[0, "dbt-section"]], "Command: test": [[0, "dbt-section"]]}, "indexentries": {}})
\ No newline at end of file
+Search.setIndex({"docnames": ["index"], "filenames": ["index.rst"], "titles": ["dbt-core\u2019s API documentation"], "terms": {"right": 0, "now": 0, "best": 0, "wai": 0, "from": 0, "i": 0, "us": 0, "dbtrunner": 0, "we": 0, "expos": 0, "cli": 0, "main": 0, "import": 0, "cli_arg": 0, "project": 0, "dir": 0, "jaffle_shop": 0, "initi": 0, "runner": 0, "re": 0, "success": 0, "you": 0, "can": 0, "also": 0, "pass": 0, "pre": 0, "construct": 0, "object": 0, "those": 0, "instead": 0, "load": 0, "up": 0, "disk": 0, "preload": 0, "load_profil": 0, "postgr": 0, "load_project": 0, "fals": 0, "thi": 0, "For": 0, "full": 0, "exampl": 0, "code": 0, "refer": 0, "py": 0, "type": 0, "boolean": 0, "If": 0, "set": 0, "variabl": 0, "resolv": 0, "unselect": 0, "node": 0, "unknown": 0, "specifi": 0, "stop": 0, "execut": 0, "first": 0, "failur": 0, "drop": 0, "increment": 0, "fulli": 0, "recalcul": 0, "tabl": 0, "definit": 0, "choic": 0, "eager": 0, "cautiou": 0, "all": 0, "ar": 0, "adjac": 0, "resourc": 0, "even": 0, "thei": 0, "have": 0, "been": 0, "explicitli": 0, "string": 0, "which": 0, "overrid": 0, "dbt_project": 0, "yml": 0, "path": 0, "directori": 0, "look": 0, "file": 0, "current": 0, "work": 0, "home": 0, "default": 0, "its": 0, "parent": 0, "todo": 0, "No": 0, "help": 0, "text": 0, "includ": 0, "The": 0, "name": 0, "defin": 0, "sampl": 0, "data": 0, "termin": 0, "given": 0, "json": 0, "compar": 0, "store": 0, "result": 0, "fail": 0, "row": 0, "databas": 0, "configur": 0, "onli": 0, "appli": 0, "dbt_target_path": 0, "int": 0, "number": 0, "while": 0, "yaml": 0, "suppli": 0, "argument": 0, "your": 0, "should": 0, "eg": 0, "my_vari": 0, "my_valu": 0, "ensur": 0, "version": 0, "match": 0, "one": 0, "requir": 0, "avail": 0, "inform": 0, "skip": 0, "inter": 0, "setup": 0, "dictionari": 0, "map": 0, "keyword": 0}, "objects": {}, "objtypes": {}, "objnames": {}, "titleterms": {"dbt": 0, "core": 0, "": 0, "api": 0, "document": 0, "how": 0, "invok": 0, "command": 0, "python": 0, "runtim": 0, "build": 0, "defer": 0, "exclud": 0, "fail_fast": 0, "full_refresh": 0, "indirect_select": 0, "profil": 0, "profiles_dir": 0, "project_dir": 0, "resource_typ": 0, "select": 0, "selector": 0, "show": 0, "state": 0, "store_failur": 0, "target": 0, "target_path": 0, "thread": 0, "var": 0, "version_check": 0, "clean": 0, "compil": 0, "model": 0, "parse_onli": 0, "debug": 0, "config_dir": 0, "dep": 0, "doc": 0, "init": 0, "project_nam": 0, "skip_profile_setup": 0, "list": 0, "output": 0, "output_kei": 0, "pars": 0, "write_manifest": 0, "run": 0, "run_oper": 0, "macro": 0, "arg": 0, "seed": 0, "snapshot": 0, "sourc": 0, "test": 0}, "envversion": {"sphinx.domains.c": 2, "sphinx.domains.changeset": 1, "sphinx.domains.citation": 1, "sphinx.domains.cpp": 8, "sphinx.domains.index": 1, "sphinx.domains.javascript": 2, "sphinx.domains.math": 2, "sphinx.domains.python": 3, "sphinx.domains.rst": 2, "sphinx.domains.std": 2, "sphinx": 57}, "alltitles": {"dbt-core\u2019s API documentation": [[0, "dbt-core-s-api-documentation"]], "How to invoke dbt commands in python runtime": [[0, "how-to-invoke-dbt-commands-in-python-runtime"]], "API documentation": [[0, "api-documentation"]], "Command: build": [[0, "dbt-section"]], "defer": [[0, "build|defer"], [0, "compile|defer"], [0, "run|defer"], [0, "snapshot|defer"], [0, "test|defer"]], "exclude": [[0, "build|exclude"], [0, "compile|exclude"], [0, "list|exclude"], [0, "list|exclude"], [0, "run|exclude"], [0, "seed|exclude"], [0, "snapshot|exclude"], [0, "test|exclude"]], "fail_fast": [[0, "build|fail_fast"], [0, "run|fail_fast"], [0, "test|fail_fast"]], "full_refresh": [[0, "build|full_refresh"], [0, "compile|full_refresh"], [0, "run|full_refresh"], [0, "seed|full_refresh"]], "indirect_selection": [[0, "build|indirect_selection"], [0, "list|indirect_selection"], [0, "list|indirect_selection"], [0, "test|indirect_selection"]], "profile": [[0, "build|profile"], [0, "clean|profile"], [0, "compile|profile"], [0, "debug|profile"], [0, "deps|profile"], [0, "init|profile"], [0, "list|profile"], [0, "list|profile"], [0, "parse|profile"], [0, "run|profile"], [0, "run-operation|profile"], [0, "seed|profile"], [0, "snapshot|profile"], [0, "test|profile"]], "profiles_dir": [[0, "build|profiles_dir"], [0, "clean|profiles_dir"], [0, "compile|profiles_dir"], [0, "debug|profiles_dir"], [0, "deps|profiles_dir"], [0, "init|profiles_dir"], [0, "list|profiles_dir"], [0, "list|profiles_dir"], [0, "parse|profiles_dir"], [0, "run|profiles_dir"], [0, "run-operation|profiles_dir"], [0, "seed|profiles_dir"], [0, "snapshot|profiles_dir"], [0, "test|profiles_dir"]], "project_dir": [[0, "build|project_dir"], [0, "clean|project_dir"], [0, "compile|project_dir"], [0, "debug|project_dir"], [0, "deps|project_dir"], [0, "init|project_dir"], [0, "list|project_dir"], [0, "list|project_dir"], [0, "parse|project_dir"], [0, "run|project_dir"], [0, "run-operation|project_dir"], [0, "seed|project_dir"], [0, "snapshot|project_dir"], [0, "test|project_dir"]], "resource_types": [[0, "build|resource_types"], [0, "list|resource_types"], [0, "list|resource_types"]], "select": [[0, "build|select"], [0, "compile|select"], [0, "list|select"], [0, "list|select"], [0, "run|select"], [0, "seed|select"], [0, "snapshot|select"], [0, "test|select"]], "selector": [[0, "build|selector"], [0, "compile|selector"], [0, "list|selector"], [0, "list|selector"], [0, "run|selector"], [0, "seed|selector"], [0, "snapshot|selector"], [0, "test|selector"]], "show": [[0, "build|show"], [0, "seed|show"]], "state": [[0, "build|state"], [0, "compile|state"], [0, "list|state"], [0, "list|state"], [0, "run|state"], [0, "seed|state"], [0, "snapshot|state"], [0, "test|state"]], "store_failures": [[0, "build|store_failures"], [0, "test|store_failures"]], "target": [[0, "build|target"], [0, "clean|target"], [0, "compile|target"], [0, "debug|target"], [0, "deps|target"], [0, "init|target"], [0, "list|target"], [0, "list|target"], [0, "parse|target"], [0, "run|target"], [0, "run-operation|target"], [0, "seed|target"], [0, "snapshot|target"], [0, "test|target"]], "target_path": [[0, "build|target_path"], [0, "compile|target_path"], [0, "parse|target_path"], [0, "run|target_path"], [0, "seed|target_path"], [0, "test|target_path"]], "threads": [[0, "build|threads"], [0, "compile|threads"], [0, "parse|threads"], [0, "run|threads"], [0, "seed|threads"], [0, "snapshot|threads"], [0, "test|threads"]], "vars": [[0, "build|vars"], [0, "clean|vars"], [0, "compile|vars"], [0, "debug|vars"], [0, "deps|vars"], [0, "init|vars"], [0, "list|vars"], [0, "list|vars"], [0, "parse|vars"], [0, "run|vars"], [0, "run-operation|vars"], [0, "seed|vars"], [0, "snapshot|vars"], [0, "test|vars"]], "version_check": [[0, "build|version_check"], [0, "compile|version_check"], [0, "debug|version_check"], [0, "parse|version_check"], [0, "run|version_check"], [0, "seed|version_check"], [0, "test|version_check"]], "Command: clean": [[0, "dbt-section"]], "Command: compile": [[0, "dbt-section"]], "models": [[0, "compile|models"], [0, "list|models"], [0, "list|models"], [0, "run|models"], [0, "seed|models"], [0, "snapshot|models"], [0, "test|models"]], "parse_only": [[0, "compile|parse_only"]], "Command: debug": [[0, "dbt-section"]], "config_dir": [[0, "debug|config_dir"]], "Command: deps": [[0, "dbt-section"]], "Command: docs": [[0, "dbt-section"]], "Command: init": [[0, "dbt-section"]], "project_name": [[0, "init|project_name"]], "skip_profile_setup": [[0, "init|skip_profile_setup"]], "Command: list": [[0, "dbt-section"], [0, "dbt-section"]], "output": [[0, "list|output"], [0, "list|output"]], "output_keys": [[0, "list|output_keys"], [0, "list|output_keys"]], "Command: parse": [[0, "dbt-section"]], "compile": [[0, "parse|compile"]], "write_manifest": [[0, "parse|write_manifest"]], "Command: run": [[0, "dbt-section"]], "Command: run_operation": [[0, "dbt-section"]], "macro": [[0, "run-operation|macro"]], "args": [[0, "run-operation|args"]], "Command: seed": [[0, "dbt-section"]], "Command: snapshot": [[0, "dbt-section"]], "Command: source": [[0, "dbt-section"]], "Command: test": [[0, "dbt-section"]]}, "indexentries": {}})
\ No newline at end of file
diff --git a/core/dbt/docs/source/_ext/dbt_click.py b/core/dbt/docs/source/_ext/dbt_click.py
index 1431ca91e41..7343cc6a110 100644
--- a/core/dbt/docs/source/_ext/dbt_click.py
+++ b/core/dbt/docs/source/_ext/dbt_click.py
@@ -44,7 +44,9 @@ def format_params(cmd) -> t.List[nodes.section]:
type_str = get_type_str(param.type)
param_section.append(nodes.paragraph(text=f"Type: {type_str}"))
- param_section.append(nodes.paragraph(text=param.help))
+ help_txt = getattr(param, "help", None)
+ if help_txt is not None:
+ param_section.append(nodes.paragraph(text=help_txt))
lines.append(param_section)
return lines
diff --git a/core/dbt/docs/source/index.rst b/core/dbt/docs/source/index.rst
index 93d34a648f2..dcd1c82499f 100644
--- a/core/dbt/docs/source/index.rst
+++ b/core/dbt/docs/source/index.rst
@@ -6,6 +6,7 @@ How to invoke dbt commands in python runtime
Right now the best way to invoke a command from python runtime is to use the `dbtRunner` we exposed
.. code-block:: python
+
from dbt.cli.main import dbtRunner
cli_args = ['run', '--project-dir', 'jaffle_shop']
diff --git a/core/dbt/main.py b/core/dbt/main.py
index 3c23cfec4b3..d651c073765 100644
--- a/core/dbt/main.py
+++ b/core/dbt/main.py
@@ -30,7 +30,6 @@
import dbt.task.generate as generate_task
import dbt.task.init as init_task
import dbt.task.list as list_task
-import dbt.task.parse as parse_task
import dbt.task.run as run_task
import dbt.task.run_operation as run_operation_task
import dbt.task.seed as seed_task
@@ -541,7 +540,9 @@ def _build_parse_subparser(subparsers, base_subparser):
Parses the project and provides information on performance
""",
)
- sub.set_defaults(cls=parse_task.ParseTask, which="parse", rpc_method="parse")
+ # NOTE: changing this cls to None is breaking, but this file should go
+ # away once merging the click work
+ sub.set_defaults(cls=None, which="parse", rpc_method="parse")
sub.add_argument("--write-manifest", action="store_true")
sub.add_argument("--compile", action="store_true")
return sub
diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py
index 787b70cfeaf..48eb9fca9c5 100644
--- a/core/dbt/parser/manifest.py
+++ b/core/dbt/parser/manifest.py
@@ -7,9 +7,11 @@
from typing import Dict, Optional, Mapping, Callable, Any, List, Type, Union, Tuple
from itertools import chain
import time
+import json
import dbt.exceptions
import dbt.tracking
+import dbt.utils
import dbt.flags as flags
from dbt.adapters.factory import (
@@ -20,6 +22,7 @@
from dbt.helper_types import PathSet
from dbt.events.functions import fire_event, get_invocation_id, warn_or_error
from dbt.events.types import (
+ ParseCmdPerfInfoPath,
PartialParsingFullReparseBecauseOfError,
PartialParsingExceptionFile,
PartialParsingFile,
@@ -44,7 +47,7 @@
from dbt.node_types import NodeType
from dbt.clients.jinja import get_rendered, MacroStack
from dbt.clients.jinja_static import statically_extract_macro_calls
-from dbt.clients.system import make_directory
+from dbt.clients.system import make_directory, write_file
from dbt.config import Project, RuntimeConfig
from dbt.context.docs import generate_runtime_docs_context
from dbt.context.macro_resolver import MacroResolver, TestMacroNamespace
@@ -89,8 +92,10 @@
from dbt.dataclass_schema import StrEnum, dbtClassMixin
+MANIFEST_FILE_NAME = "manifest.json"
PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
PARSING_STATE = DbtProcessState("parsing")
+PERF_INFO_FILE_NAME = "perf_info.json"
class ReparseReason(StrEnum):
@@ -193,6 +198,7 @@ def get_full_manifest(
config: RuntimeConfig,
*,
reset: bool = False,
+ write_perf_info=False,
) -> Manifest:
adapter = get_adapter(config) # type: ignore
@@ -223,6 +229,9 @@ def get_full_manifest(
loader._perf_info.load_all_elapsed = time.perf_counter() - start_load_all
loader.track_project_load()
+ if write_perf_info:
+ loader.write_perf_info(config.target_path)
+
return manifest
# This is where the main action happens
@@ -954,6 +963,11 @@ def process_nodes(self):
self.manifest.rebuild_ref_lookup()
+ def write_perf_info(self, target_path: str):
+ path = os.path.join(target_path, PERF_INFO_FILE_NAME)
+ write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
+ fire_event(ParseCmdPerfInfoPath(path=path))
+
def invalid_target_fail_unless_test(
node,
@@ -1378,3 +1392,8 @@ def process_node(config: RuntimeConfig, manifest: Manifest, node: ManifestNode):
_process_refs_for_node(manifest, config.project_name, node)
ctx = generate_runtime_docs_context(config, node, manifest, config.project_name)
_process_docs_for_node(ctx, node)
+
+
+def write_manifest(manifest: Manifest, target_path: str):
+ path = os.path.join(target_path, MANIFEST_FILE_NAME)
+ manifest.write(path)
diff --git a/core/dbt/task/base.py b/core/dbt/task/base.py
index f6d37937e99..1a948b48b2e 100644
--- a/core/dbt/task/base.py
+++ b/core/dbt/task/base.py
@@ -45,10 +45,11 @@
from dbt.events.contextvars import get_node_info
from .printer import print_run_result_error
-from dbt.adapters.factory import register_adapter
+from dbt.adapters.factory import get_adapter
from dbt.config import RuntimeConfig, Project
from dbt.config.profile import read_profile
import dbt.exceptions
+from dbt.graph import Graph
class NoneConfig:
@@ -96,7 +97,7 @@ def set_log_format(cls):
log_manager.format_text()
@classmethod
- def from_args(cls, args):
+ def from_args(cls, args, *pargs, **kwargs):
try:
# This is usually RuntimeConfig
config = cls.ConfigType.from_args(args)
@@ -123,7 +124,7 @@ def from_args(cls, args):
tracking.track_invalid_invocation(args=args, result_type=exc.result_type)
raise dbt.exceptions.RuntimeException("Could not run dbt") from exc
- return cls(args, config)
+ return cls(args, config, *pargs, **kwargs)
@abstractmethod
def run(self):
@@ -167,17 +168,36 @@ def move_to_nearest_project_dir(project_dir: Optional[str]) -> str:
return nearest_project_dir
+# TODO: look into deprecating this class in favor of several small functions that
+# produce the same behavior. currently this class only contains manifest compilation,
+# holding a manifest, and moving direcories.
class ConfiguredTask(BaseTask):
ConfigType = RuntimeConfig
- def __init__(self, args, config):
+ def __init__(self, args, config, manifest: Optional[Manifest] = None):
super().__init__(args, config)
- register_adapter(self.config)
+ self.graph: Optional[Graph] = None
+ self.manifest = manifest
+
+ def compile_manifest(self):
+ if self.manifest is None:
+ raise InternalException("compile_manifest called before manifest was loaded")
+
+ start_compile_manifest = time.perf_counter()
+
+ # we cannot get adapter in init since it will break rpc #5579
+ adapter = get_adapter(self.config)
+ compiler = adapter.get_compiler()
+ self.graph = compiler.compile(self.manifest)
+
+ compile_time = time.perf_counter() - start_compile_manifest
+ if dbt.tracking.active_user is not None:
+ dbt.tracking.track_runnable_timing({"graph_compilation_elapsed": compile_time})
@classmethod
- def from_args(cls, args):
+ def from_args(cls, args, *pargs, **kwargs):
move_to_nearest_project_dir(args.project_dir)
- return super().from_args(args)
+ return super().from_args(args, *pargs, **kwargs)
class ExecutionContext:
diff --git a/core/dbt/task/compile.py b/core/dbt/task/compile.py
index 740d35d37e9..92eedcd97d3 100644
--- a/core/dbt/task/compile.py
+++ b/core/dbt/task/compile.py
@@ -10,6 +10,7 @@
from dbt.graph import ResourceTypeSelector
from dbt.events.functions import fire_event
from dbt.events.types import CompileComplete
+from dbt.parser.manifest import write_manifest
from dbt.node_types import NodeType
@@ -85,4 +86,4 @@ def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):
selected=selected_uids,
)
# TODO: is it wrong to write the manifest here? I think it's right...
- self.write_manifest()
+ write_manifest(self.manifest, self.config.target_path)
diff --git a/core/dbt/task/generate.py b/core/dbt/task/generate.py
index 87723a530a1..119a32acf42 100644
--- a/core/dbt/task/generate.py
+++ b/core/dbt/task/generate.py
@@ -31,7 +31,7 @@
CannotGenerateDocs,
BuildingCatalog,
)
-from dbt.parser.manifest import ManifestLoader
+from dbt.parser.manifest import write_manifest
import dbt.utils
import dbt.compilation
import dbt.exceptions
@@ -199,11 +199,6 @@ def get_unique_id_mapping(
class GenerateTask(CompileTask):
- def _get_manifest(self) -> Manifest:
- if self.manifest is None:
- raise InternalException("manifest should not be None in _get_manifest")
- return self.manifest
-
def run(self) -> CatalogArtifact:
compile_results = None
if self.args.compile:
@@ -217,8 +212,6 @@ def run(self) -> CatalogArtifact:
errors=None,
compile_results=compile_results,
)
- else:
- self.manifest = ManifestLoader.get_full_manifest(self.config)
shutil.copyfile(DOCS_INDEX_FILE_PATH, os.path.join(self.config.target_path, "index.html"))
@@ -262,7 +255,7 @@ def run(self) -> CatalogArtifact:
path = os.path.join(self.config.target_path, CATALOG_FILENAME)
results.write(path)
if self.args.compile:
- self.write_manifest()
+ write_manifest(self.manifest, self.config.target_path)
if exceptions:
fire_event(WriteCatalogFailure(num_exceptions=len(exceptions)))
diff --git a/core/dbt/task/list.py b/core/dbt/task/list.py
index e1be8f214d3..a0b549f620f 100644
--- a/core/dbt/task/list.py
+++ b/core/dbt/task/list.py
@@ -2,7 +2,7 @@
from dbt.contracts.graph.nodes import Exposure, SourceDefinition, Metric
from dbt.graph import ResourceTypeSelector
-from dbt.task.runnable import GraphRunnableTask, ManifestTask
+from dbt.task.runnable import GraphRunnableTask
from dbt.task.test import TestSelector
from dbt.node_types import NodeType
from dbt.events.functions import warn_or_error
@@ -40,8 +40,8 @@ class ListTask(GraphRunnableTask):
)
)
- def __init__(self, args, config):
- super().__init__(args, config)
+ def __init__(self, args, config, manifest):
+ super().__init__(args, config, manifest)
if self.args.models:
if self.args.select:
raise RuntimeException('"models" and "select" are mutually exclusive arguments')
@@ -132,7 +132,7 @@ def generate_paths(self):
yield node.original_file_path
def run(self):
- ManifestTask._runtime_initialize(self)
+ self.compile_manifest()
output = self.args.output
if output == "selector":
generator = self.generate_selectors
diff --git a/core/dbt/task/parse.py b/core/dbt/task/parse.py
deleted file mode 100644
index 5460bf0f3d0..00000000000
--- a/core/dbt/task/parse.py
+++ /dev/null
@@ -1,102 +0,0 @@
-# This task is intended to be used for diagnosis, development and
-# performance analysis.
-# It separates out the parsing flows for easier logging and
-# debugging.
-# To store cProfile performance data, execute with the '-r'
-# flag and an output file: dbt -r dbt.cprof parse.
-# Use a visualizer such as snakeviz to look at the output:
-# snakeviz dbt.cprof
-from dbt.task.base import ConfiguredTask
-from dbt.adapters.factory import get_adapter
-from dbt.parser.manifest import Manifest, ManifestLoader, _check_manifest
-from dbt.logger import DbtProcessState
-from dbt.clients.system import write_file
-from dbt.events.types import (
- ManifestDependenciesLoaded,
- ManifestLoaderCreated,
- ManifestLoaded,
- ManifestChecked,
- ManifestFlatGraphBuilt,
- ParseCmdStart,
- ParseCmdCompiling,
- ParseCmdWritingManifest,
- ParseCmdDone,
- ParseCmdPerfInfoPath,
-)
-from dbt.events.functions import fire_event
-from dbt.graph import Graph
-import time
-from typing import Optional
-import os
-import json
-import dbt.utils
-
-MANIFEST_FILE_NAME = "manifest.json"
-PERF_INFO_FILE_NAME = "perf_info.json"
-PARSING_STATE = DbtProcessState("parsing")
-
-
-class ParseTask(ConfiguredTask):
- def __init__(self, args, config):
- super().__init__(args, config)
- self.manifest: Optional[Manifest] = None
- self.graph: Optional[Graph] = None
- self.loader: Optional[ManifestLoader] = None
-
- def write_manifest(self):
- path = os.path.join(self.config.target_path, MANIFEST_FILE_NAME)
- self.manifest.write(path)
-
- def write_perf_info(self):
- path = os.path.join(self.config.target_path, PERF_INFO_FILE_NAME)
- write_file(path, json.dumps(self.loader._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
- fire_event(ParseCmdPerfInfoPath(path=path))
-
- # This method takes code that normally exists in other files
- # and pulls it in here, to simplify logging and make the
- # parsing flow-of-control easier to understand and manage,
- # with the downside that if changes happen in those other methods,
- # similar changes might need to be made here.
- # ManifestLoader.get_full_manifest
- # ManifestLoader.load
- # ManifestLoader.load_all
-
- def get_full_manifest(self):
- adapter = get_adapter(self.config) # type: ignore
- root_config = self.config
- macro_hook = adapter.connections.set_query_header
- with PARSING_STATE:
- start_load_all = time.perf_counter()
- projects = root_config.load_dependencies()
- fire_event(ManifestDependenciesLoaded())
- loader = ManifestLoader(root_config, projects, macro_hook)
- fire_event(ManifestLoaderCreated())
- manifest = loader.load()
- fire_event(ManifestLoaded())
- _check_manifest(manifest, root_config)
- fire_event(ManifestChecked())
- manifest.build_flat_graph()
- fire_event(ManifestFlatGraphBuilt())
- loader._perf_info.load_all_elapsed = time.perf_counter() - start_load_all
-
- self.loader = loader
- self.manifest = manifest
- fire_event(ManifestLoaded())
-
- def compile_manifest(self):
- adapter = get_adapter(self.config)
- compiler = adapter.get_compiler()
- self.graph = compiler.compile(self.manifest)
-
- def run(self):
- fire_event(ParseCmdStart())
- self.get_full_manifest()
- if self.args.compile:
- fire_event(ParseCmdCompiling())
- self.compile_manifest()
- if self.args.write_manifest:
- fire_event(ParseCmdWritingManifest())
- self.write_manifest()
-
- self.write_perf_info()
- fire_event(ParseCmdDone())
diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py
index bc8f9a2de75..f21dfd570e9 100644
--- a/core/dbt/task/run.py
+++ b/core/dbt/task/run.py
@@ -284,8 +284,8 @@ def execute(self, model, manifest):
class RunTask(CompileTask):
- def __init__(self, args, config):
- super().__init__(args, config)
+ def __init__(self, args, config, manifest):
+ super().__init__(args, config, manifest)
self.ran_hooks = []
self._total_executed = 0
diff --git a/core/dbt/task/run_operation.py b/core/dbt/task/run_operation.py
index 9d7a469efd8..70bf39042f7 100644
--- a/core/dbt/task/run_operation.py
+++ b/core/dbt/task/run_operation.py
@@ -3,12 +3,11 @@
import agate
-from .runnable import ManifestTask
+from .base import ConfiguredTask
import dbt.exceptions
from dbt.adapters.factory import get_adapter
from dbt.contracts.results import RunOperationResultsArtifact
-from dbt.exceptions import InternalException
from dbt.events.functions import fire_event
from dbt.events.types import (
RunningOperationCaughtError,
@@ -17,7 +16,7 @@
)
-class RunOperationTask(ManifestTask):
+class RunOperationTask(ConfiguredTask):
def _get_macro_parts(self):
macro_name = self.args.macro
if "." in macro_name:
@@ -27,10 +26,6 @@ def _get_macro_parts(self):
return package_name, macro_name
- def compile_manifest(self) -> None:
- if self.manifest is None:
- raise InternalException("manifest was None in compile_manifest")
-
def _run_unsafe(self) -> agate.Table:
adapter = get_adapter(self.config)
@@ -47,7 +42,7 @@ def _run_unsafe(self) -> agate.Table:
def run(self) -> RunOperationResultsArtifact:
start = datetime.utcnow()
- self._runtime_initialize()
+ self.compile_manifest()
try:
self._run_unsafe()
except dbt.exceptions.Exception as exc:
diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py
index fa8fdb724a8..e86cdbd2973 100644
--- a/core/dbt/task/runnable.py
+++ b/core/dbt/task/runnable.py
@@ -1,6 +1,5 @@
import os
import time
-import json
from pathlib import Path
from abc import abstractmethod
from concurrent.futures import as_completed
@@ -13,7 +12,6 @@
print_run_end_messages,
)
-from dbt.clients.system import write_file
from dbt.task.base import ConfiguredTask
from dbt.adapters.base import BaseRelation
from dbt.adapters.factory import get_adapter
@@ -39,7 +37,6 @@
NothingToDo,
)
from dbt.events.contextvars import log_contextvars
-from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import SourceDefinition, ResultNode
from dbt.contracts.results import NodeStatus, RunExecutionResult, RunningStatus
from dbt.contracts.state import PreviousState
@@ -50,8 +47,8 @@
FailFastException,
)
-from dbt.graph import GraphQueue, NodeSelector, SelectionSpec, parse_difference, Graph
-from dbt.parser.manifest import ManifestLoader
+from dbt.graph import GraphQueue, NodeSelector, SelectionSpec, parse_difference
+from dbt.parser.manifest import write_manifest
import dbt.tracking
import dbt.exceptions
@@ -59,53 +56,15 @@
import dbt.utils
RESULT_FILE_NAME = "run_results.json"
-MANIFEST_FILE_NAME = "manifest.json"
RUNNING_STATE = DbtProcessState("running")
-class ManifestTask(ConfiguredTask):
- def __init__(self, args, config):
- super().__init__(args, config)
- self.manifest: Optional[Manifest] = None
- self.graph: Optional[Graph] = None
-
- def write_manifest(self):
- if flags.WRITE_JSON:
- path = os.path.join(self.config.target_path, MANIFEST_FILE_NAME)
- self.manifest.write(path)
- if os.getenv("DBT_WRITE_FILES"):
- path = os.path.join(self.config.target_path, "files.json")
- write_file(path, json.dumps(self.manifest.files, cls=dbt.utils.JSONEncoder, indent=4))
-
- def load_manifest(self):
- self.manifest = ManifestLoader.get_full_manifest(self.config)
- self.write_manifest()
-
- def compile_manifest(self):
- if self.manifest is None:
- raise InternalException("compile_manifest called before manifest was loaded")
-
- # we cannot get adapter in init since it will break rpc #5579
- adapter = get_adapter(self.config)
- compiler = adapter.get_compiler()
- self.graph = compiler.compile(self.manifest)
-
- def _runtime_initialize(self):
- self.load_manifest()
-
- start_compile_manifest = time.perf_counter()
- self.compile_manifest()
- compile_time = time.perf_counter() - start_compile_manifest
- if dbt.tracking.active_user is not None:
- dbt.tracking.track_runnable_timing({"graph_compilation_elapsed": compile_time})
-
-
-class GraphRunnableTask(ManifestTask):
+class GraphRunnableTask(ConfiguredTask):
MARK_DEPENDENT_ERRORS_STATUSES = [NodeStatus.Error]
- def __init__(self, args, config):
- super().__init__(args, config)
+ def __init__(self, args, config, manifest):
+ super().__init__(args, config, manifest)
self.job_queue: Optional[GraphQueue] = None
self._flattened_nodes: Optional[List[ResultNode]] = None
@@ -165,9 +124,9 @@ def get_graph_queue(self) -> GraphQueue:
return selector.get_graph_queue(spec)
def _runtime_initialize(self):
- super()._runtime_initialize()
+ self.compile_manifest()
if self.manifest is None or self.graph is None:
- raise InternalException("_runtime_initialize never loaded the manifest and graph!")
+ raise InternalException("_runtime_initialize never loaded the graph!")
self.job_queue = self.get_graph_queue()
@@ -490,7 +449,7 @@ def run(self):
)
if flags.WRITE_JSON:
- self.write_manifest()
+ write_manifest(self.manifest, self.config.target_path)
self.write_result(result)
self.task_end_messages(result.results)
diff --git a/test/unit/test_config.py b/test/unit/test_config.py
index 456f16fade6..d45ee86587d 100644
--- a/test/unit/test_config.py
+++ b/test/unit/test_config.py
@@ -12,15 +12,15 @@
import dbt.config
import dbt.exceptions
+import dbt.tracking
from dbt import flags
from dbt.adapters.factory import load_plugin
from dbt.adapters.postgres import PostgresCredentials
-from dbt.context.base import generate_base_context
from dbt.contracts.connection import QueryComment, DEFAULT_QUERY_COMMENT
from dbt.contracts.project import PackageConfig, LocalPackage, GitPackage
from dbt.node_types import NodeType
from dbt.semver import VersionSpecifier
-from dbt.task.run_operation import RunOperationTask
+from dbt.task.base import ConfiguredTask
from .utils import normalize
@@ -909,7 +909,12 @@ def test_with_invalid_package(self):
dbt.config.Project.from_project_root(self.project_dir, renderer)
-class TestRunOperationTask(BaseFileTest):
+class InheritsFromConfiguredTask(ConfiguredTask):
+ def run(self):
+ pass
+
+
+class TestConfiguredTask(BaseFileTest):
def setUp(self):
super().setUp()
self.write_project(self.default_project_data)
@@ -921,17 +926,17 @@ def tearDown(self):
# so it's necessary to change it back at the end.
os.chdir(INITIAL_ROOT)
- def test_run_operation_task(self):
+ def test_configured_task_dir_change(self):
self.assertEqual(os.getcwd(), INITIAL_ROOT)
self.assertNotEqual(INITIAL_ROOT, self.project_dir)
- new_task = RunOperationTask.from_args(self.args)
+ new_task = InheritsFromConfiguredTask.from_args(self.args)
self.assertEqual(os.path.realpath(os.getcwd()),
os.path.realpath(self.project_dir))
- def test_run_operation_task_with_bad_path(self):
+ def test_configured_task_dir_change_with_bad_path(self):
self.args.project_dir = 'bad_path'
with self.assertRaises(dbt.exceptions.RuntimeException):
- new_task = RunOperationTask.from_args(self.args)
+ new_task = InheritsFromConfiguredTask.from_args(self.args)
class TestVariableProjectFile(BaseFileTest):