From 7dc88e8235740f3118a9b02b3e07ef2d20aac889 Mon Sep 17 00:00:00 2001 From: atulya-astronomer Date: Mon, 23 Dec 2024 16:06:06 +0530 Subject: [PATCH] Datastage xml demo added (#22) * Add base files for DataStage * Update xml_demo.py Signed-off-by: akshaykumarsalunke <165752380+akshaykumarsalunke@users.noreply.github.com> * check cli * Delete tests/data_stage/demo/workflow/PH_Insert_EMPLOYEE_DIM.xml Signed-off-by: akshaykumarsalunke <165752380+akshaykumarsalunke@users.noreply.github.com> * Added lxml as dependency * Added translation demo file and tests * Changed lib from lxml to defusedxml * Upgraded version * Updated taskgroup rule as per orbiter req * Simplified XML demo * Update orbiter_translations/data_stage/xml_demo.py Co-authored-by: fritz-astronomer <80706212+fritz-astronomer@users.noreply.github.com> Signed-off-by: atulya-astronomer * Update orbiter_translations/data_stage/xml_demo.py Co-authored-by: fritz-astronomer <80706212+fritz-astronomer@users.noreply.github.com> Signed-off-by: atulya-astronomer * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: akshaykumarsalunke <165752380+akshaykumarsalunke@users.noreply.github.com> Signed-off-by: atulya-astronomer Co-authored-by: akshaykumarsalunke <165752380+akshaykumarsalunke@users.noreply.github.com> Co-authored-by: akshaykumarsalunke Co-authored-by: fritz-astronomer <80706212+fritz-astronomer@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- orbiter_translations/__init__.py | 2 +- orbiter_translations/dag_factory/yaml_base.py | 2 +- orbiter_translations/data_stage/__init__.py | 0 orbiter_translations/data_stage/xml_demo.py | 157 ++++++++++++++++++ pyproject.toml | 3 + tests/data_stage/demo/data_stage_demo_test.py | 15 ++ tests/data_stage/demo/workflow/demo.xml | 115 +++++++++++++ 7 files changed, 292 insertions(+), 2 deletions(-) create mode 100644 orbiter_translations/data_stage/__init__.py create mode 100644 orbiter_translations/data_stage/xml_demo.py create mode 100644 tests/data_stage/demo/data_stage_demo_test.py create mode 100644 tests/data_stage/demo/workflow/demo.xml diff --git a/orbiter_translations/__init__.py b/orbiter_translations/__init__.py index 2546acc..cdef753 100644 --- a/orbiter_translations/__init__.py +++ b/orbiter_translations/__init__.py @@ -1,3 +1,3 @@ -__version__ = "0.3.0" +__version__ = "0.4.0" version = __version__ diff --git a/orbiter_translations/dag_factory/yaml_base.py b/orbiter_translations/dag_factory/yaml_base.py index a28f779..bbf3e1e 100644 --- a/orbiter_translations/dag_factory/yaml_base.py +++ b/orbiter_translations/dag_factory/yaml_base.py @@ -262,7 +262,7 @@ def task_group_rule(val: dict) -> OrbiterTaskGroup | None: if task_group_id := val.pop("__task_group_id", ""): _ = val.pop("dependencies", []) return OrbiterTaskGroup( - tasks=[], + tasks={}, task_group_id=task_group_id, **callback_args(val), ) diff --git a/orbiter_translations/data_stage/__init__.py b/orbiter_translations/data_stage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/orbiter_translations/data_stage/xml_demo.py b/orbiter_translations/data_stage/xml_demo.py new file mode 100644 index 0000000..d96c534 --- /dev/null +++ b/orbiter_translations/data_stage/xml_demo.py @@ -0,0 +1,157 @@ +from __future__ import annotations +from itertools import pairwise +from defusedxml import ElementTree +import inflection +import json +import jq +from orbiter.file_types import FileTypeXML +from orbiter.objects import conn_id +from orbiter.objects.dag import OrbiterDAG +from orbiter.objects.operators.empty import OrbiterEmptyOperator +from orbiter.objects.operators.sql import OrbiterSQLExecuteQueryOperator +from orbiter.objects.task import OrbiterOperator +from orbiter.objects.task_group import OrbiterTaskGroup +from orbiter.objects.task import OrbiterTaskDependency +from orbiter.rules import ( + dag_filter_rule, + dag_rule, + task_filter_rule, + task_rule, + task_dependency_rule, + cannot_map_rule, +) +from orbiter.rules.rulesets import ( + DAGFilterRuleset, + DAGRuleset, + TaskFilterRuleset, + TaskRuleset, + TaskDependencyRuleset, + PostProcessingRuleset, + TranslationRuleset, +) + + +@dag_filter_rule +def basic_dag_filter(val: dict) -> list | None: + """Filter input down to a list of dictionaries that can be processed by the `@dag_rules`""" + return val["DSExport"][0]["Job"] + + +@dag_rule +def basic_dag_rule(val: dict) -> OrbiterDAG | None: + """Translate input into an `OrbiterDAG`""" + try: + dag_id = val["@Identifier"] + dag_id = inflection.underscore(dag_id) + return OrbiterDAG(dag_id=dag_id, file_path=f"{dag_id}.py") + except Exception: + return None + + +@task_filter_rule +def basic_task_filter(val: dict) -> list | None: + """Filter input down to a list of dictionaries that can be processed by the `@task_rules`""" + if isinstance(val, dict): + val = json.loads(json.dumps(val, default=str)) # pre-serialize values, for JQ + try: + return ( + jq.compile(""".Record[] | select(.["@Type"] == "CustomStage")""") + .input_value(val) + .all() + ) + except StopIteration: + pass + return None + + +@task_rule(priority=2) +def basic_task_rule(val: dict) -> OrbiterOperator | OrbiterTaskGroup | None: + """Translate input into an Operator (e.g. `OrbiterBashOperator`). will be applied first, with a higher priority""" + if "task_id" in val: + return OrbiterEmptyOperator(task_id=val["task_id"]) + else: + return None + + +def task_common_args(val: dict) -> dict: + """ + Common mappings for all tasks + """ + task_id: str = ( + jq.compile(""".Property[] | select(.["@Name"] == "Name") | .["#text"]""") + .input_value(val) + .first() + ) + task_id = inflection.underscore(task_id) + params = {"task_id": task_id} + return params + + +def extract_sql_statements(root): + sql_statements = {} + sql_tags = ["SelectStatement", "BeforeSQL", "AfterSQL"] + + for tag in sql_tags: + elements = root.findall(f".//{tag}") + for elem in elements: + if elem.text: + sql_text = elem.text.strip() + sql_statements[tag] = sql_text + return sql_statements + + +@task_rule(priority=2) +def sql_command_rule(val) -> OrbiterSQLExecuteQueryOperator | None: + """ + For SQLQueryOperator. + + """ # noqa: E501 + try: + sql: str = ( + jq.compile( + """.Collection[] | .SubRecord[] | .Property[] | select(.["@PreFormatted"] == "1") | .["#text"] """ + ) + .input_value(val) + .first() + ) + root = ElementTree.fromstring(sql.encode("utf-16")) + sql_statements = extract_sql_statements(root) + sql = " ".join(sql_statements.values()) + if sql: + return OrbiterSQLExecuteQueryOperator( + sql=sql, + **conn_id(conn_id="snowflake_default", conn_type="snowflake"), + **task_common_args(val), + ) + except StopIteration: + pass + return None + + +@task_dependency_rule +def basic_task_dependency_rule(val: OrbiterDAG) -> list | None: + """Translate input into a list of task dependencies""" + task_dependencies = [] + if len(val.tasks.values()) > 1: + for pre, post in pairwise(val.tasks.values()): + task_dependencies.append( + OrbiterTaskDependency( + task_id=pre.task_id, + downstream=post.task_id, + ) + ) + return task_dependencies + return [] + + +translation_ruleset = TranslationRuleset( + file_type={FileTypeXML}, + dag_filter_ruleset=DAGFilterRuleset(ruleset=[basic_dag_filter]), + dag_ruleset=DAGRuleset(ruleset=[basic_dag_rule]), + task_filter_ruleset=TaskFilterRuleset(ruleset=[basic_task_filter]), + task_ruleset=TaskRuleset( + ruleset=[sql_command_rule, basic_task_rule, cannot_map_rule] + ), + task_dependency_ruleset=TaskDependencyRuleset(ruleset=[basic_task_dependency_rule]), + post_processing_ruleset=PostProcessingRuleset(ruleset=[]), +) diff --git a/pyproject.toml b/pyproject.toml index 623a5a2..7cf7676 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,9 @@ dependencies = [ # for templating within @rules "jinja2", + # for parsing xml + "defusedxml", + # for deep-parsing dicts within @rules "jq", diff --git a/tests/data_stage/demo/data_stage_demo_test.py b/tests/data_stage/demo/data_stage_demo_test.py new file mode 100644 index 0000000..110d94e --- /dev/null +++ b/tests/data_stage/demo/data_stage_demo_test.py @@ -0,0 +1,15 @@ +from orbiter_translations.data_stage.xml_demo import translation_ruleset + + +def test_data_stage_demo(project_root): + actual = translation_ruleset.translate_fn( + translation_ruleset, (project_root / "tests/data_stage/demo/workflow/") + ) + assert list(actual.dags.keys()) == ["data_stage_job"] + assert sorted(list(list(actual.dags.values())[0].tasks.keys())) == sorted( + ["select_table", "unknown"] + ) + + assert sorted( + list(list(actual.dags.values())[0].tasks["select_table"].downstream) + ) == ["unknown"] diff --git a/tests/data_stage/demo/workflow/demo.xml b/tests/data_stage/demo/workflow/demo.xml new file mode 100644 index 0000000..63b8235 --- /dev/null +++ b/tests/data_stage/demo/workflow/demo.xml @@ -0,0 +1,115 @@ + + +
+ + + SELECT_TABLE + + + XMLProperties + <?xml version='1.0' + encoding='UTF-16'?><Properties + version='1.1'><Common><Context + type='int'>1</Context><Variant + type='string'>1.0</Variant><DescriptorVersion + type='string'>1.0</DescriptorVersion><PartitionType + type='int'>-1</PartitionType><RCP + type='int'>0</RCP></Common><Connection><URL + modified='1' + type='string'><![CDATA[jdbc:snowflake://xyz.us-east-1.snowflakecomputing.com/?&warehouse=#XYZ_DB.$snowflake_wh#&db=#DB.$schema#]]></URL><Username + modified='1' + type='string'><![CDATA[#DB.$snowflake_userid#]]></Username><Password + modified='1' + type='string'><![CDATA[#DB.$snowflake_passwd#]]></Password><Attributes + modified='1' + type='string'><![CDATA[]]></Attributes></Connection><Usage><ReadMode + type='int'><![CDATA[0]]></ReadMode><GenerateSQL + modified='1' + type='bool'><![CDATA[0]]></GenerateSQL><EnableQuotedIDs + type='bool'><![CDATA[0]]></EnableQuotedIDs><SQL><SelectStatement + collapsed='1' modified='1' + type='string'><![CDATA[ + Select 1 as Dummy from db.schema.table Limit 1;]]><ReadFromFileSelect + type='bool'><![CDATA[0]]></ReadFromFileSelect></SelectStatement><EnablePartitionedReads + type='bool'><![CDATA[0]]></EnablePartitionedReads></SQL><Transaction><RecordCount + type='int'><![CDATA[2000]]></RecordCount><IsolationLevel + type='int'><![CDATA[0]]></IsolationLevel><AutocommitMode + modified='1' + type='int'><![CDATA[1]]></AutocommitMode><EndOfWave + type='int'><![CDATA[0]]></EndOfWave><BeginEnd + collapsed='1' + type='bool'><![CDATA[0]]></BeginEnd></Transaction><Session><ArraySize + type='int'><![CDATA[1]]></ArraySize><FetchSize + type='int'><![CDATA[0]]></FetchSize><ReportSchemaMismatch + type='bool'><![CDATA[0]]></ReportSchemaMismatch><DefaultLengthForColumns + type='int'><![CDATA[200]]></DefaultLengthForColumns><DefaultLengthForLongColumns + type='int'><![CDATA[20000]]></DefaultLengthForLongColumns><CharacterSetForNonUnicodeColumns + collapsed='1' + type='int'><![CDATA[0]]></CharacterSetForNonUnicodeColumns><KeepConductorConnectionAlive + type='bool'><![CDATA[1]]></KeepConductorConnectionAlive></Session><BeforeAfter + modified='1' type='bool'><![CDATA[1]]><BeforeSQL + collapsed='1' modified='1' + type='string'><![CDATA[]]><ReadFromFileBeforeSQL + type='bool'><![CDATA[0]]></ReadFromFileBeforeSQL><FailOnError + type='bool'><![CDATA[1]]></FailOnError></BeforeSQL><AfterSQL + collapsed='1' modified='1' + type='string'><![CDATA[ + SELECT * FROM db.schema.table; + ]]><ReadFromFileAfterSQL + type='bool'><![CDATA[0]]></ReadFromFileAfterSQL><FailOnError + type='bool'><![CDATA[1]]></FailOnError></AfterSQL><BeforeSQLNode + type='string'><![CDATA[]]><ReadFromFileBeforeSQLNode + type='bool'><![CDATA[0]]></ReadFromFileBeforeSQLNode><FailOnError + type='bool'><![CDATA[1]]></FailOnError></BeforeSQLNode><AfterSQLNode + type='string'><![CDATA[]]><ReadFromFileAfterSQLNode + type='bool'><![CDATA[0]]></ReadFromFileAfterSQLNode><FailOnError + type='bool'><![CDATA[1]]></FailOnError></AfterSQLNode></BeforeAfter><Java><ConnectorClasspath + type='string'><![CDATA[$(DSHOME)/../DSComponents/bin/ccjdbc.jar;$(DSHOME)]]></ConnectorClasspath><HeapSize + modified='1' + type='int'><![CDATA[1024]]></HeapSize><ConnectorOtherOptions + type='string'><![CDATA[-Dcom.ibm.is.cc.options=noisfjars]]></ConnectorOtherOptions></Java><LimitRows + collapsed='1' + type='bool'><![CDATA[0]]></LimitRows></Usage></Properties + > + + + 0 + + + Peek_SF_Ld + 2 + V253S1P1 + PxPeek + 0 + + + all + \(20) + + + nrecs + 10 + + + dataset + \(20) + + + name + name + + + columns + \(20) + + + selection + \(20) + + + 0 + + +