forked from astronomer/astronomer-cosmos
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support partial parsing (astronomer#800)
## Description dbt uses `partial_parse.msgpack` to make rendering things a lot faster. This PR adds support for `partial_parse.msgpack` in the following places: - `ExecutionMode.LOCAL` - `ExecutionMode.VIRTUALENV` - `LoadMode.DBT_LS` This PR also allows users to explicitly _turn off_ partial parsing. If this is done, then `--no-partial-parse` will be passed as an explicit flag in all dbt command invocations (i.e. all `ExecutionMode`s and `LoadMode.DBT_LS`, albeit not the `dbt deps` invocation.) This should address some performance complaints that users have, e.g. this message from Slack: https://apache-airflow.slack.com/archives/C059CC42E9W/p1704483361206829 Albeit, this user will also need to provide a `partial_parse.msgpack`. My experimentation and looking at dbt-core source code confirms that dbt does not use `manifest.json` when partial parsing. It appears that these are just output artifacts, but not input artifacts. Only `partial_parse.msgpack` is used. (There are a couple ways to confirm this other than just checking source code Also, I added a minor refactor of a feature I added a year ago: I added `send_sigint()` to the custom subprocess hook, since this custom subprocess hook did not exist back when I added it (if you want me to split this refactor into a different PR then let me know). ### API change I decided the best way to go about this would be to just add a `partial_parse: bool` to both the execution config and render config. For example: ```python dbt_group = DbtTaskGroup( ..., execution_config=ExecutionConfig( ..., partial_parse=True ), render_config=RenderConfig( ..., partial_parse=False ) ) ``` That said, in all honesty users will not need to set this at all, except unless they want to suppress the little warning message about not being able to find the `partial_parse.msgpack`. This is because by default this addition searches for a msgpack if it exists, which is already the existing behavior in a sense, except right now the msgpack file never exists (dbt does look for it though). When inserting into the `AbstractDbtBaseOperator`, I did not use `global_boolean_flags`. See the subsection below for why. ### Other execution performance improvements The main reason I am adding this feature is that it should dramatically improve performance for users. However, it is not the only way to improve It's possible that we should add a way to add the flag `--no-write-json` as an explicit kwarg to the dbt base operator. Right now users can support this via `dbt_cmd_global_flags=["--no-write-json"]`. Some users (e.g. those using Elementary, or those using the dbt local operator `callback` kwarg) will want to write the JSON, but I suspect the majority of users will not. Similarly, `--log-level-file` is not used at all, and at minimum dbt should work best the vast majority of time with `--no-cache-selected-only` set. It's also possible there should be some sort of "performant" mode that automatically sets all these defaults for optimal performance: - `--no-write-json` - `--log-level-file=none` - `--no-cache-selected-only` Perhaps a "performant" config would be too cumbersome to implement (I would agree with that). In which case the docs could also have a section on performance tips. ### A note on `global_boolean_flags` I did not add the partial parse support to `global_boolean_flags` because it doesn't quite fit into the existing paradigm for this. Right now the default for each of these `global_boolean_flags` is False, whereas the default for partial parse is actually True. This makes fitting it in awkward. I think it's possible that just having a `tuple[str]` is insufficient here, as some flags you may want to add (not just `--no-partial-parse` but also `--no-write-json` are by default _True_ and must be explicitly turned off. Meaning that the parsing Cosmos does with flags of `'--{flag.replace("_", "-")}'` is ineffective for flags like this. Right now, we have an example of putting _no_ in front with `no_version_check`. Meaning that the default behavior of version checking is True, but the flag itself starts as negated so the default is actually `False`. My proposal is, instead of `global_boolean_flags: tuple[str]`, this should instead be `global_boolean_flags: tuple[str | tuple[str, str | None, str | None]]`. In the case that a global flag is a `tuple[str, str | None, str | None]`, then the first arg should be the flag, the second should be "if true," and the third should be "if false." `None` indicates, when true/false (respectively), then do nothing. For example: ```python class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta): ... global_boolean_flags = ( ("no_version_check", "--no-version-check", None), ("cache_selected_only", "-cache-selected-only", None), ("partial_parse", None, "--no-partial-parse"), ) ``` And Cosmos want to support `str` parsing for backwards compatibility. It's pretty straightforward to convert the data type: ```python if isinstance(flag, str): flag = (flag, '--{flag.replace("_", "-")}', None) ``` ## Related Issue(s) - Resolves astronomer#791 - Partially resolves astronomer#785 - astronomer#785 should probably be split up into two different stages: (1) support for partial parsing (2) (a) dbt project dir / manifest / `partial_parse.msgpack` is allowed to come from cloud storage. (b) `dbt compile` is able to dump into cloud storage. ## Breaking Change? Should not break anything. This doesn't do anything when `partial_parse.msgpack` is missing, and the default behavior (`partial_parse=True`) does not alter the dbt cmd flags. ## Checklist - [x] I have made corresponding changes to the documentation (if required) - [x] I have added tests that prove my fix is effective or that my feature works --------- Co-authored-by: Tatiana Al-Chueyr <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Julian LaNeve <[email protected]> Co-authored-by: Justin Bandoro <[email protected]>
- Loading branch information
Showing
15 changed files
with
227 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
from pathlib import Path | ||
from tempfile import TemporaryDirectory | ||
from unittest.mock import MagicMock, patch | ||
import signal | ||
|
||
import pytest | ||
|
||
from cosmos.hooks.subprocess import FullOutputSubprocessHook | ||
|
||
OS_ENV_KEY = "SUBPROCESS_ENV_TEST" | ||
OS_ENV_VAL = "this-is-from-os-environ" | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"env,expected", | ||
[ | ||
({"ABC": "123", "AAA": "456"}, {"ABC": "123", "AAA": "456", OS_ENV_KEY: ""}), | ||
({}, {OS_ENV_KEY: ""}), | ||
(None, {OS_ENV_KEY: OS_ENV_VAL}), | ||
], | ||
ids=["with env", "empty env", "no env"], | ||
) | ||
def test_env(env, expected): | ||
""" | ||
Test that env variables are exported correctly to the command environment. | ||
When ``env`` is ``None``, ``os.environ`` should be passed to ``Popen``. | ||
Otherwise, the variables in ``env`` should be available, and ``os.environ`` should not. | ||
""" | ||
hook = FullOutputSubprocessHook() | ||
|
||
def build_cmd(keys, filename): | ||
""" | ||
Produce bash command to echo env vars into filename. | ||
Will always echo the special test var named ``OS_ENV_KEY`` into the file to test whether | ||
``os.environ`` is passed or not. | ||
""" | ||
return "\n".join(f"echo {k}=${k}>> {filename}" for k in [*keys, OS_ENV_KEY]) | ||
|
||
with TemporaryDirectory() as tmp_dir, patch.dict("os.environ", {OS_ENV_KEY: OS_ENV_VAL}): | ||
tmp_file = Path(tmp_dir, "test.txt") | ||
command = build_cmd(env and env.keys() or [], tmp_file.as_posix()) | ||
hook.run_command(command=["bash", "-c", command], env=env) | ||
actual = dict([x.split("=") for x in tmp_file.read_text().splitlines()]) | ||
assert actual == expected | ||
|
||
|
||
def test_subprocess_hook(): | ||
hook = FullOutputSubprocessHook() | ||
result = hook.run_command(command=["bash", "-c", f'echo "foo"']) | ||
assert result.exit_code == 0 | ||
assert result.output == "foo" | ||
assert result.full_output == ["foo"] | ||
|
||
|
||
@patch("os.getpgid", return_value=123) | ||
@patch("os.killpg") | ||
def test_send_sigint(mock_killpg, mock_getpgid): | ||
hook = FullOutputSubprocessHook() | ||
hook.sub_process = MagicMock() | ||
hook.send_sigint() | ||
mock_killpg.assert_called_with(123, signal.SIGINT) | ||
|
||
|
||
@patch("os.getpgid", return_value=123) | ||
@patch("os.killpg") | ||
def test_send_sigterm(mock_killpg, mock_getpgid): | ||
hook = FullOutputSubprocessHook() | ||
hook.sub_process = MagicMock() | ||
hook.send_sigterm() | ||
mock_killpg.assert_called_with(123, signal.SIGTERM) |
Oops, something went wrong.