Skip to content

Commit

Permalink
Merge pull request #860 from dyvenia/modify_transform_catalog_commands
Browse files Browse the repository at this point in the history
Modify `TransformAndCatalogToLuma` process and commands
  • Loading branch information
Rafalz13 authored Apr 9, 2024
2 parents 624f5b8 + 0fe6897 commit bb4c4d5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 21 deletions.
25 changes: 7 additions & 18 deletions viadot/flows/transform_and_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ def gen_flow(self) -> Flow:
run_select = self.dbt_selects.get("run")
run_select_safe = f"-s {run_select}" if run_select is not None else ""

run = ShellTask(
name="dbt_task_run",
run_model = ShellTask(
name="dbt_task_run_model",
command=f"dbt run {run_select_safe} {dbt_target_option}",
helper_script=f"cd {local_dbt_repo_path}",
return_all=True,
Expand All @@ -202,24 +202,14 @@ def gen_flow(self) -> Flow:
test_select = self.dbt_selects.get("test", run_select)
test_select_safe = f"-s {test_select}" if test_select is not None else ""

test = ShellTask(
name="dbt_task_test",
run_tests = ShellTask(
name="dbt_task_run_tests",
command=f"dbt test {test_select_safe} {dbt_target_option}",
helper_script=f"cd {local_dbt_repo_path}",
return_all=True,
stream_output=True,
).bind(flow=self)

# Generate docs
# Produces `catalog.json`, `run-results.json`, and `manifest.json`

generate_catalog_json = custom_shell_task.bind(
name="dbt_task_docs_generate",
command=f"dbt docs generate {dbt_target_option} --no-compile",
helper_script=f"cd {self.dbt_project_path}",
flow=self,
)

# Upload build metadata to Luma
path_expanded = os.path.expandvars(self.metadata_dir_path)
metadata_dir_path = Path(path_expanded)
Expand All @@ -237,8 +227,7 @@ def gen_flow(self) -> Flow:

dbt_clean_up.set_upstream(clone, flow=self)
pull_dbt_deps.set_upstream(dbt_clean_up, flow=self)
run.set_upstream(pull_dbt_deps, flow=self)
test.set_upstream(run, flow=self)
generate_catalog_json.set_upstream(test, flow=self)
upload_metadata_luma.set_upstream(generate_catalog_json, flow=self)
run_model.set_upstream(pull_dbt_deps, flow=self)
run_tests.set_upstream(run_model, flow=self)
upload_metadata_luma.set_upstream(run_tests, flow=self)
_cleanup_repo.set_upstream(upload_metadata_luma, flow=self)
8 changes: 5 additions & 3 deletions viadot/tasks/luma.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(
self,
metadata_dir_path: str,
url: str = None,
dbt_project_path: str = None,
dbt_project_path: str = "tmp_dbt_repo_dir",
credentials_secret: str = None,
vault_name: str = None,
*args,
Expand All @@ -26,6 +26,8 @@ def __init__(
In the case of dbt, it's dbt project's `target` directory, which contains dbt artifacts
(`sources.json`, `catalog.json`, `manifest.json`, and `run_results.json`).
url (str, optional): The url of the Luma ingestion API. Defaults to None.
dbt_project_path (str, optional): The path to the dbt project (the directory containing
the `dbt_project.yml` file). Defaults to 'tmp_dbt_repo_dir'.
credentials_secret (str, optional): The name of the Azure Key Vault secret containing Luma credentials.
Defaults to None.
vault_name (str, optional): The name of the vault from which to obtain the secrets. Defaults to None.
Expand All @@ -38,10 +40,10 @@ def __init__(
secret=credentials_secret, vault_name=vault_name
)
url = json.loads(credentials_str).get("url")
self.helper_script = dbt_project_path
self.helper_script = f"cd {dbt_project_path}"
self.url = url
self.metadata_dir_path = metadata_dir_path
self.command = f"luma dbt send-test-results --luma-url {url} --metadata-dir {metadata_dir_path}"
self.command = f"luma dbt send-test-results --luma-url {url} --metadata-dir {metadata_dir_path} --no-config"
self.return_all = True
self.stream_output = True
self.log_stderr = True

0 comments on commit bb4c4d5

Please sign in to comment.