-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use callback in case of command failure #867
Comments
🤖 Hey @fabiomx, long time no see! Hope you've been coding away without too many bugs. 😉 To ensure that the Here's how you can modify the def handle_exception_subprocess(self, result: FullOutputSubprocessResult) -> None:
try:
if self.skip_exit_code is not None and result.exit_code == self.skip_exit_code:
raise AirflowSkipException(f"dbt command returned exit code {self.skip_exit_code}. Skipping.")
elif result.exit_code != 0:
raise AirflowException(
f"dbt command failed. The command returned a non-zero exit code {result.exit_code}. Details: ",
*result.full_output,
)
finally:
if self.callback:
self.callback(self.project_dir)
self.upload_run_results_to_gcs(self.project_dir)
def handle_exception_dbt_runner(self, result: dbtRunnerResult) -> None:
try:
if not result.success:
if result.exception:
raise AirflowException(f"dbt invocation did not complete with unhandled error: {result.exception}")
else:
node_names, node_results = extract_dbt_runner_issues(result, ["error", "fail", "runtime error"])
error_message = "\n".join([f"{name}: {result}" for name, result in zip(node_names, node_results)])
raise AirflowException(f"dbt invocation completed with errors: {error_message}")
finally:
if self.callback:
self.callback(self.project_dir)
self.upload_run_results_to_gcs(self.project_dir)
def upload_run_results_to_gcs(self, project_dir: str) -> None:
# Implement the logic to upload run_results.json to GCS
# This is a placeholder function. You need to replace it with actual code to upload to GCS.
pass In this modification, the This approach allows you to handle the dbt command failure gracefully by ensuring that necessary cleanup or follow-up actions (like calling a callback and uploading files to GCS) are still performed even in the event of a failure. The current implementation of the Here is the relevant part of the constructor signature from the def __init__(
self,
profile_config: ProfileConfig,
invocation_mode: InvocationMode | None = None,
install_deps: bool = False,
callback: Callable[[str], None] | None = None,
should_store_compiled_sql: bool = True,
**kwargs: Any,
) -> None:
...
self.callback = callback
... This implementation detail indicates that the Sources
|
hi @fabiomx , we recently merged PR #1389, which introduces minor changes to the existing callback functionality and will be included in the upcoming Cosmos 1.8.0 release. In the PR, we also made changes so that the callback is called first and then the exceptions are raised/handled. To allow users to try out these changes ahead of the official release, we have prepared an alpha release. You can install it using the following link: astronomer-cosmos 1.8.0a3. PR #1389 also provides examples showcasing how to use this callback functionality. For additional guidance, refer to the documentation on leveraging callbacks: Callback Configuration. The helper functions demonstrated in the examples can be found here: cosmos/io.py. However, you are not limited to these; you can create your own custom callback functions using these examples as a reference and pass them via the callback argument. We would greatly appreciate any feedback you have after testing this alpha release! |
As mentioned in this Slack thread with @jlaneve, I've built a custom local operator, based on the
DbtLocalBaseOperator
, to upload therun_results.json
file to GCS after running thetest
command, using thecallback
parameter.Everything works fine if the tests are ok, but if any test fails, an Airflow exception is raised, and the
callback
is not called (https://github.com/astronomer/astronomer-cosmos/blob/main/cosmos/operators/local.py#L321-L344).At least for the
test
command, I would still need to upload therun_results.json
produced in thetmp_project_dir.
even in the case of failure. Indeed, when the tests fail, it's precisely when the information from therun_results.json
is most critical. Moreover, after the failure, I can't access thetmp_project_dir
anymore, so I haven't been able to use theon_failure_callback
either.The text was updated successfully, but these errors were encountered: