Skip to content

Commit

Permalink
Small fix for Orbiter v1.2.1 (#16)
Browse files Browse the repository at this point in the history
* cicd: remove duplicate pre-commit check

* fix: pre-serialize dag dict for jq
  • Loading branch information
fritz-astronomer authored Sep 3, 2024
1 parent 4d9b861 commit 1230da7
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 20 deletions.
14 changes: 0 additions & 14 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,6 @@ on: [pull_request]
permissions:
contents: write
jobs:
check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.10'
cache: 'pip'
- uses: extractions/setup-just@v2
- run: just install
- run: |
git fetch origin
pre-commit run --from-ref origin/${{ github.event.pull_request.base.ref }} --to-ref ${{ github.event.pull_request.head.sha }}
test:
runs-on: ubuntu-latest
strategy:
Expand Down
2 changes: 1 addition & 1 deletion orbiter_translations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = "0.2.2"
__version__ = "0.2.3"

version = __version__
15 changes: 10 additions & 5 deletions orbiter_translations/ssis/xml_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from pendulum import DateTime, Timezone
with DAG(dag_id='demo.extract_sample_currency_data', schedule=None, start_date=DateTime(1970, 1, 1, 0, 0, 0), catchup=False, doc_md=...):
extract_sample_currency_data_task = EmptyOperator(task_id='extract_sample_currency_data', doc_md='Input did not translate: `{"@id": "1", "@name": "Extract Sample Currency Data", "connections": [{"connection": [{"@id": "6", "@name": "FlatFileConnection", "@connectionManagerID": "{EA76C836-FF8B-4E34-B273-81D4F67FCB3D}"}]}], "outputs": [{"output": [{"@id": "2", "@name": "Flat File Source Output"}, {"@id": "3", "@name": "Flat File Source Error Output"}]}]}`')
extract_sample_currency_data_task = EmptyOperator(task_id='extract_sample_currency_data', doc_md=...)
lookup_currency_key_task = SQLExecuteQueryOperator(task_id='lookup_currency_key', conn_id='mssql_default', sql="select * from (select * from [dbo].[DimCurrency]) as refTable where [refTable].[CurrencyAlternateKey] = 'ARS' OR [refTable].[CurrencyAlternateKey] = 'VEB'")
lookup_date_key_task = SQLExecuteQueryOperator(task_id='lookup_date_key', conn_id='mssql_default', sql='select * from [dbo].[DimTime]')
sample_ole__db_destination_task = EmptyOperator(task_id='sample_ole__db_destination', doc_md='Input did not translate: `{"@id": "100", "@name": "Sample OLE DB Destination", "@componentClassID": "{5A0B62E8-D91D-49F5-94A5-7BE58DE508F0}", "inputs": [{"input": [{"@id": "113", "@name": "OLE DB Destination Input"}]}], "outputs": null}`')
sample_ole__db_destination_task = EmptyOperator(task_id='sample_ole__db_destination', doc_md=...)
extract_sample_currency_data_task >> lookup_currency_key_task
lookup_currency_key_task >> lookup_date_key_task
lookup_date_key_task >> sample_ole__db_destination_task
Expand All @@ -54,6 +54,8 @@
""" # noqa: E501

from __future__ import annotations

import json
from copy import deepcopy
from pathlib import Path

Expand Down Expand Up @@ -175,6 +177,7 @@ def basic_dag_rule(val: dict) -> OrbiterDAG | None:
```
""" # noqa: E501
if isinstance(val, dict):
val = json.loads(json.dumps(val, default=str)) # pre-serialize values, for JQ
try:
# Pipeline name is <DTS:Property><DTS:Name="ObjectName">...</DTS:Name>
pipeline_name = (
Expand Down Expand Up @@ -247,6 +250,7 @@ def task_filter_rule(val: dict) -> list[dict] | None:
```
""" # noqa: E501
if isinstance(val, dict):
val = json.loads(json.dumps(val, default=str)) # pre-serialize values, for JQ
try:
return (
jq.compile(
Expand Down Expand Up @@ -338,8 +342,6 @@ def simple_task_dependencies(
```
""" # noqa: E501
# Descend through all the tasks, and get their `<outputs><output id='...'>` elements

# Descend through all the tasks, and get their `<inputs><input id='...'>` elements
output_to_task_id = {
# @id -> task_id
output_id: task.task_id
Expand All @@ -349,6 +351,7 @@ def simple_task_dependencies(
for o in (output.get("output", []) or [])
if (output_id := o.get("@id"))
}
# Descend through all the tasks, and get their `<inputs><input id='...'>` elements
input_to_task_id = {
# @id -> task_id
input_id: task.task_id
Expand All @@ -359,7 +362,9 @@ def simple_task_dependencies(
if (input_id := i.get("@id"))
}

dag_original_input = val.orbiter_kwargs.get("val", {})
dag_original_input = json.loads(
json.dumps(val.orbiter_kwargs.get("val", {}), default=str)
) # pre-serialize values, for JQ
# noinspection PyUnboundLocalVariable
return [
OrbiterTaskDependency(
Expand Down

0 comments on commit 1230da7

Please sign in to comment.