Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use callback in case of command failure #867

Closed
fabiomx opened this issue Feb 28, 2024 · 2 comments · Fixed by #1389
Closed

Use callback in case of command failure #867

fabiomx opened this issue Feb 28, 2024 · 2 comments · Fixed by #1389
Assignees
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc dbt:test Primarily related to dbt test command or functionality epic-assigned execution:callback Tasks related to callback when executing tasks execution:local Related to Local execution environment
Milestone

Comments

@fabiomx
Copy link
Contributor

fabiomx commented Feb 28, 2024

As mentioned in this Slack thread with @jlaneve, I've built a custom local operator, based on the DbtLocalBaseOperator, to upload the run_results.json file to GCS after running the test command, using the callback parameter.

Everything works fine if the tests are ok, but if any test fails, an Airflow exception is raised, and the callback is not called (https://github.com/astronomer/astronomer-cosmos/blob/main/cosmos/operators/local.py#L321-L344).

[2024-02-28, 15:26:13 CET] {log.py:232} WARNING - [[34m2024-02-28T14:26:13.085+0000[0m] {[34msubprocess.py:[0m94} INFO[0m - [35m(astronomer-cosmos)[0m - [0m14:26:13  Done. PASS=114 WARN=0 ERROR=1 SKIP=0 TOTAL=115[0m
[2024-02-28, 15:26:13 CET] {subprocess.py:94} INFO - [0m14:26:13  Done. PASS=114 WARN=0 ERROR=1 SKIP=0 TOTAL=115
[2024-02-28, 15:26:14 CET] {log.py:232} WARNING - [[34m2024-02-28T14:26:14.488+0000[0m] {[34msubprocess.py:[0m98} INFO[0m - [35m(astronomer-cosmos)[0m - Command exited with return code 1[0m
[2024-02-28, 15:26:14 CET] {subprocess.py:98} INFO - Command exited with return code 1
...
[2024-02-28, 15:26:14 CET] {taskinstance.py:1826} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/cosmos/operators/local.py", line 366, in execute
    self.build_and_run_cmd(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/cosmos/operators/local.py", line 361, in build_and_run_cmd
    result = self.run_command(cmd=dbt_cmd, env=env, context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/cosmos/operators/local.py", line 247, in run_command
    self.exception_handling(result)
  File "/opt/python3.8/lib/python3.8/site-packages/cosmos/operators/local.py", line 128, in exception_handling
    raise AirflowException(
airflow.exceptions.AirflowException: ('dbt command failed. The command returned a non-zero exit code 1.
...

At least for the test command, I would still need to upload the run_results.json produced in the tmp_project_dir. even in the case of failure. Indeed, when the tests fail, it's precisely when the information from the run_results.json is most critical. Moreover, after the failure, I can't access the tmp_project_dir anymore, so I haven't been able to use the on_failure_callback either.

Copy link

dosubot bot commented Feb 28, 2024

🤖

Hey @fabiomx, long time no see! Hope you've been coding away without too many bugs. 😉

To ensure that the callback parameter is still called and the run_results.json file is uploaded to GCS even when the test command fails and an Airflow exception is raised in the DbtLocalBaseOperator in the Cosmos repository, you can modify the exception handling logic in both handle_exception_subprocess and handle_exception_dbt_runner methods. After raising the Airflow exception, you should call the callback function if it's not None and then handle the uploading of run_results.json to GCS.

Here's how you can modify the handle_exception_subprocess and handle_exception_dbt_runner methods:

def handle_exception_subprocess(self, result: FullOutputSubprocessResult) -> None:
    try:
        if self.skip_exit_code is not None and result.exit_code == self.skip_exit_code:
            raise AirflowSkipException(f"dbt command returned exit code {self.skip_exit_code}. Skipping.")
        elif result.exit_code != 0:
            raise AirflowException(
                f"dbt command failed. The command returned a non-zero exit code {result.exit_code}. Details: ",
                *result.full_output,
            )
    finally:
        if self.callback:
            self.callback(self.project_dir)
        self.upload_run_results_to_gcs(self.project_dir)

def handle_exception_dbt_runner(self, result: dbtRunnerResult) -> None:
    try:
        if not result.success:
            if result.exception:
                raise AirflowException(f"dbt invocation did not complete with unhandled error: {result.exception}")
            else:
                node_names, node_results = extract_dbt_runner_issues(result, ["error", "fail", "runtime error"])
                error_message = "\n".join([f"{name}: {result}" for name, result in zip(node_names, node_results)])
                raise AirflowException(f"dbt invocation completed with errors: {error_message}")
    finally:
        if self.callback:
            self.callback(self.project_dir)
        self.upload_run_results_to_gcs(self.project_dir)

def upload_run_results_to_gcs(self, project_dir: str) -> None:
    # Implement the logic to upload run_results.json to GCS
    # This is a placeholder function. You need to replace it with actual code to upload to GCS.
    pass

In this modification, the finally block ensures that the callback function is called and the run_results.json file is uploaded to GCS regardless of whether an exception was raised due to a failed dbt command. You need to implement the upload_run_results_to_gcs method with the actual logic to upload the file to GCS, which is not provided here.

This approach allows you to handle the dbt command failure gracefully by ensuring that necessary cleanup or follow-up actions (like calling a callback and uploading files to GCS) are still performed even in the event of a failure.

The current implementation of the callback parameter in the DbtLocalBaseOperator is as a parameter that accepts a callable function. This callable function is expected to take a single argument, a string representing the path to the dbt project directory, and return None. The callback function is intended to be called after a dbt run completes.

Here is the relevant part of the constructor signature from the DbtLocalBaseOperator class that shows how the callback parameter is defined:

def __init__(
        self,
        profile_config: ProfileConfig,
        invocation_mode: InvocationMode | None = None,
        install_deps: bool = False,
        callback: Callable[[str], None] | None = None,
        should_store_compiled_sql: bool = True,
        **kwargs: Any,
    ) -> None:
        ...
        self.callback = callback
        ...

This implementation detail indicates that the callback parameter is designed to be flexible, allowing users of the DbtLocalBaseOperator to specify custom logic to be executed after dbt commands are run, by passing a function that does something with the path to the dbt project directory.

Sources

About Dosu

This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@dosubot dosubot bot added area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc dbt:test Primarily related to dbt test command or functionality execution:local Related to Local execution environment labels Feb 28, 2024
@tatiana tatiana added this to the 1.5.0 milestone May 17, 2024
@tatiana tatiana added the triage-needed Items need to be reviewed / assigned to milestone label May 17, 2024
@tatiana tatiana removed the triage-needed Items need to be reviewed / assigned to milestone label May 17, 2024
@tatiana tatiana mentioned this issue May 17, 2024
@tatiana tatiana modified the milestones: Cosmos 1.5.0, Cosmos 1.6.0 Jun 6, 2024
@tatiana tatiana modified the milestones: Cosmos 1.6.0, Cosmos 1.7.0 Jul 5, 2024
@tatiana tatiana modified the milestones: Cosmos 1.7.0, Triage Sep 20, 2024
@tatiana tatiana modified the milestones: Triage, Cosmos 1.8.0 Oct 30, 2024
@tatiana tatiana added the execution:callback Tasks related to callback when executing tasks label Nov 29, 2024
@pankajkoti
Copy link
Contributor

pankajkoti commented Dec 17, 2024

hi @fabiomx , we recently merged PR #1389, which introduces minor changes to the existing callback functionality and will be included in the upcoming Cosmos 1.8.0 release. In the PR, we also made changes so that the callback is called first and then the exceptions are raised/handled.

To allow users to try out these changes ahead of the official release, we have prepared an alpha release. You can install it using the following link: astronomer-cosmos 1.8.0a3. PR #1389 also provides examples showcasing how to use this callback functionality.

For additional guidance, refer to the documentation on leveraging callbacks: Callback Configuration. The helper functions demonstrated in the examples can be found here: cosmos/io.py. However, you are not limited to these; you can create your own custom callback functions using these examples as a reference and pass them via the callback argument.

We would greatly appreciate any feedback you have after testing this alpha release!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc dbt:test Primarily related to dbt test command or functionality epic-assigned execution:callback Tasks related to callback when executing tasks execution:local Related to Local execution environment
Projects
None yet
3 participants