From 1230da7550bb50868222d8073f16929ad598012b Mon Sep 17 00:00:00 2001 From: fritz-astronomer <80706212+fritz-astronomer@users.noreply.github.com> Date: Tue, 3 Sep 2024 16:59:54 -0400 Subject: [PATCH] Small fix for Orbiter v1.2.1 (#16) * cicd: remove duplicate pre-commit check * fix: pre-serialize dag dict for jq --- .github/workflows/checks.yml | 14 -------------- orbiter_translations/__init__.py | 2 +- orbiter_translations/ssis/xml_demo.py | 15 ++++++++++----- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 3189e26..82f31fb 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -3,20 +3,6 @@ on: [pull_request] permissions: contents: write jobs: - check: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: '3.10' - cache: 'pip' - - uses: extractions/setup-just@v2 - - run: just install - - run: | - git fetch origin - pre-commit run --from-ref origin/${{ github.event.pull_request.base.ref }} --to-ref ${{ github.event.pull_request.head.sha }} - test: runs-on: ubuntu-latest strategy: diff --git a/orbiter_translations/__init__.py b/orbiter_translations/__init__.py index 807e163..45c2168 100644 --- a/orbiter_translations/__init__.py +++ b/orbiter_translations/__init__.py @@ -1,3 +1,3 @@ -__version__ = "0.2.2" +__version__ = "0.2.3" version = __version__ diff --git a/orbiter_translations/ssis/xml_demo.py b/orbiter_translations/ssis/xml_demo.py index 2e5b3e1..c5f1aba 100644 --- a/orbiter_translations/ssis/xml_demo.py +++ b/orbiter_translations/ssis/xml_demo.py @@ -42,10 +42,10 @@ from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from pendulum import DateTime, Timezone with DAG(dag_id='demo.extract_sample_currency_data', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, doc_md=...): - extract_sample_currency_data_task = EmptyOperator(task_id='extract_sample_currency_data', doc_md='Input did not translate: `{"@id": "1", "@name": "Extract Sample Currency Data", "connections": [{"connection": [{"@id": "6", "@name": "FlatFileConnection", "@connectionManagerID": "{EA76C836-FF8B-4E34-B273-81D4F67FCB3D}"}]}], "outputs": [{"output": [{"@id": "2", "@name": "Flat File Source Output"}, {"@id": "3", "@name": "Flat File Source Error Output"}]}]}`') + extract_sample_currency_data_task = EmptyOperator(task_id='extract_sample_currency_data', doc_md=...) lookup_currency_key_task = SQLExecuteQueryOperator(task_id='lookup_currency_key', conn_id='mssql_default', sql="select * from (select * from [dbo].[DimCurrency]) as refTable where [refTable].[CurrencyAlternateKey] = 'ARS' OR [refTable].[CurrencyAlternateKey] = 'VEB'") lookup_date_key_task = SQLExecuteQueryOperator(task_id='lookup_date_key', conn_id='mssql_default', sql='select * from [dbo].[DimTime]') - sample_ole__db_destination_task = EmptyOperator(task_id='sample_ole__db_destination', doc_md='Input did not translate: `{"@id": "100", "@name": "Sample OLE DB Destination", "@componentClassID": "{5A0B62E8-D91D-49F5-94A5-7BE58DE508F0}", "inputs": [{"input": [{"@id": "113", "@name": "OLE DB Destination Input"}]}], "outputs": null}`') + sample_ole__db_destination_task = EmptyOperator(task_id='sample_ole__db_destination', doc_md=...) extract_sample_currency_data_task >> lookup_currency_key_task lookup_currency_key_task >> lookup_date_key_task lookup_date_key_task >> sample_ole__db_destination_task @@ -54,6 +54,8 @@ """ # noqa: E501 from __future__ import annotations + +import json from copy import deepcopy from pathlib import Path @@ -175,6 +177,7 @@ def basic_dag_rule(val: dict) -> OrbiterDAG | None: ``` """ # noqa: E501 if isinstance(val, dict): + val = json.loads(json.dumps(val, default=str)) # pre-serialize values, for JQ try: # Pipeline name is ... pipeline_name = ( @@ -247,6 +250,7 @@ def task_filter_rule(val: dict) -> list[dict] | None: ``` """ # noqa: E501 if isinstance(val, dict): + val = json.loads(json.dumps(val, default=str)) # pre-serialize values, for JQ try: return ( jq.compile( @@ -338,8 +342,6 @@ def simple_task_dependencies( ``` """ # noqa: E501 # Descend through all the tasks, and get their `` elements - - # Descend through all the tasks, and get their `` elements output_to_task_id = { # @id -> task_id output_id: task.task_id @@ -349,6 +351,7 @@ def simple_task_dependencies( for o in (output.get("output", []) or []) if (output_id := o.get("@id")) } + # Descend through all the tasks, and get their `` elements input_to_task_id = { # @id -> task_id input_id: task.task_id @@ -359,7 +362,9 @@ def simple_task_dependencies( if (input_id := i.get("@id")) } - dag_original_input = val.orbiter_kwargs.get("val", {}) + dag_original_input = json.loads( + json.dumps(val.orbiter_kwargs.get("val", {}), default=str) + ) # pre-serialize values, for JQ # noinspection PyUnboundLocalVariable return [ OrbiterTaskDependency(