Skip to content

Commit

Permalink
Datastage xml demo added (#22)
Browse files Browse the repository at this point in the history
* Add base files for DataStage

* Update xml_demo.py

Signed-off-by: akshaykumarsalunke <[email protected]>

* check cli

* Delete tests/data_stage/demo/workflow/PH_Insert_EMPLOYEE_DIM.xml

Signed-off-by: akshaykumarsalunke <[email protected]>

* Added lxml as dependency

* Added translation demo file and tests

* Changed lib from lxml to defusedxml

* Upgraded version

* Updated taskgroup rule as per orbiter req

* Simplified XML demo

* Update orbiter_translations/data_stage/xml_demo.py

Co-authored-by: fritz-astronomer <[email protected]>
Signed-off-by: atulya-astronomer <[email protected]>

* Update orbiter_translations/data_stage/xml_demo.py

Co-authored-by: fritz-astronomer <[email protected]>
Signed-off-by: atulya-astronomer <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: akshaykumarsalunke <[email protected]>
Signed-off-by: atulya-astronomer <[email protected]>
Co-authored-by: akshaykumarsalunke <[email protected]>
Co-authored-by: akshaykumarsalunke <[email protected]>
Co-authored-by: fritz-astronomer <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
5 people authored Dec 23, 2024
1 parent d7d6bda commit 7dc88e8
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 2 deletions.
2 changes: 1 addition & 1 deletion orbiter_translations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = "0.3.0"
__version__ = "0.4.0"

version = __version__
2 changes: 1 addition & 1 deletion orbiter_translations/dag_factory/yaml_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def task_group_rule(val: dict) -> OrbiterTaskGroup | None:
if task_group_id := val.pop("__task_group_id", ""):
_ = val.pop("dependencies", [])
return OrbiterTaskGroup(
tasks=[],
tasks={},
task_group_id=task_group_id,
**callback_args(val),
)
Expand Down
Empty file.
157 changes: 157 additions & 0 deletions orbiter_translations/data_stage/xml_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import annotations
from itertools import pairwise
from defusedxml import ElementTree
import inflection
import json
import jq
from orbiter.file_types import FileTypeXML
from orbiter.objects import conn_id
from orbiter.objects.dag import OrbiterDAG
from orbiter.objects.operators.empty import OrbiterEmptyOperator
from orbiter.objects.operators.sql import OrbiterSQLExecuteQueryOperator
from orbiter.objects.task import OrbiterOperator
from orbiter.objects.task_group import OrbiterTaskGroup
from orbiter.objects.task import OrbiterTaskDependency
from orbiter.rules import (
dag_filter_rule,
dag_rule,
task_filter_rule,
task_rule,
task_dependency_rule,
cannot_map_rule,
)
from orbiter.rules.rulesets import (
DAGFilterRuleset,
DAGRuleset,
TaskFilterRuleset,
TaskRuleset,
TaskDependencyRuleset,
PostProcessingRuleset,
TranslationRuleset,
)


@dag_filter_rule
def basic_dag_filter(val: dict) -> list | None:
"""Filter input down to a list of dictionaries that can be processed by the `@dag_rules`"""
return val["DSExport"][0]["Job"]


@dag_rule
def basic_dag_rule(val: dict) -> OrbiterDAG | None:
"""Translate input into an `OrbiterDAG`"""
try:
dag_id = val["@Identifier"]
dag_id = inflection.underscore(dag_id)
return OrbiterDAG(dag_id=dag_id, file_path=f"{dag_id}.py")
except Exception:
return None


@task_filter_rule
def basic_task_filter(val: dict) -> list | None:
"""Filter input down to a list of dictionaries that can be processed by the `@task_rules`"""
if isinstance(val, dict):
val = json.loads(json.dumps(val, default=str)) # pre-serialize values, for JQ
try:
return (
jq.compile(""".Record[] | select(.["@Type"] == "CustomStage")""")
.input_value(val)
.all()
)
except StopIteration:
pass
return None


@task_rule(priority=2)
def basic_task_rule(val: dict) -> OrbiterOperator | OrbiterTaskGroup | None:
"""Translate input into an Operator (e.g. `OrbiterBashOperator`). will be applied first, with a higher priority"""
if "task_id" in val:
return OrbiterEmptyOperator(task_id=val["task_id"])
else:
return None


def task_common_args(val: dict) -> dict:
"""
Common mappings for all tasks
"""
task_id: str = (
jq.compile(""".Property[] | select(.["@Name"] == "Name") | .["#text"]""")
.input_value(val)
.first()
)
task_id = inflection.underscore(task_id)
params = {"task_id": task_id}
return params


def extract_sql_statements(root):
sql_statements = {}
sql_tags = ["SelectStatement", "BeforeSQL", "AfterSQL"]

for tag in sql_tags:
elements = root.findall(f".//{tag}")
for elem in elements:
if elem.text:
sql_text = elem.text.strip()
sql_statements[tag] = sql_text
return sql_statements


@task_rule(priority=2)
def sql_command_rule(val) -> OrbiterSQLExecuteQueryOperator | None:
"""
For SQLQueryOperator.
""" # noqa: E501
try:
sql: str = (
jq.compile(
""".Collection[] | .SubRecord[] | .Property[] | select(.["@PreFormatted"] == "1") | .["#text"] """
)
.input_value(val)
.first()
)
root = ElementTree.fromstring(sql.encode("utf-16"))
sql_statements = extract_sql_statements(root)
sql = " ".join(sql_statements.values())
if sql:
return OrbiterSQLExecuteQueryOperator(
sql=sql,
**conn_id(conn_id="snowflake_default", conn_type="snowflake"),
**task_common_args(val),
)
except StopIteration:
pass
return None


@task_dependency_rule
def basic_task_dependency_rule(val: OrbiterDAG) -> list | None:
"""Translate input into a list of task dependencies"""
task_dependencies = []
if len(val.tasks.values()) > 1:
for pre, post in pairwise(val.tasks.values()):
task_dependencies.append(
OrbiterTaskDependency(
task_id=pre.task_id,
downstream=post.task_id,
)
)
return task_dependencies
return []


translation_ruleset = TranslationRuleset(
file_type={FileTypeXML},
dag_filter_ruleset=DAGFilterRuleset(ruleset=[basic_dag_filter]),
dag_ruleset=DAGRuleset(ruleset=[basic_dag_rule]),
task_filter_ruleset=TaskFilterRuleset(ruleset=[basic_task_filter]),
task_ruleset=TaskRuleset(
ruleset=[sql_command_rule, basic_task_rule, cannot_map_rule]
),
task_dependency_ruleset=TaskDependencyRuleset(ruleset=[basic_task_dependency_rule]),
post_processing_ruleset=PostProcessingRuleset(ruleset=[]),
)
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ dependencies = [
# for templating within @rules
"jinja2",

# for parsing xml
"defusedxml",

# for deep-parsing dicts within @rules
"jq",

Expand Down
15 changes: 15 additions & 0 deletions tests/data_stage/demo/data_stage_demo_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from orbiter_translations.data_stage.xml_demo import translation_ruleset


def test_data_stage_demo(project_root):
actual = translation_ruleset.translate_fn(
translation_ruleset, (project_root / "tests/data_stage/demo/workflow/")
)
assert list(actual.dags.keys()) == ["data_stage_job"]
assert sorted(list(list(actual.dags.values())[0].tasks.keys())) == sorted(
["select_table", "unknown"]
)

assert sorted(
list(list(actual.dags.values())[0].tasks["select_table"].downstream)
) == ["unknown"]
115 changes: 115 additions & 0 deletions tests/data_stage/demo/workflow/demo.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
<?xml version="1.0" encoding="UTF-8"?>
<DSExport>
<Header CharacterSet="CP1252" ExportingTool="IBM InfoSphere DataStage Export" ToolVersion="8"
ServerName="AAAAAA" ToolInstanceID="AAAA" Date="2024-11-21" Time="10.40.02"
ServerVersion="11.7" />
<Job Identifier="DataStage_Job" DateModified="2020-11-27" TimeModified="05.07.33">
<Record Identifier="V253S0" Type="CustomStage" Readonly="0">
<Property Name="Name">SELECT_TABLE</Property>
<Collection Name="Properties" Type="CustomProperty">
<SubRecord>
<Property Name="Name">XMLProperties</Property>
<Property Name="Value" PreFormatted="1">&lt;?xml version=&apos;1.0&apos;
encoding=&apos;UTF-16&apos;?&gt;&lt;Properties
version=&apos;1.1&apos;&gt;&lt;Common&gt;&lt;Context
type=&apos;int&apos;&gt;1&lt;/Context&gt;&lt;Variant
type=&apos;string&apos;&gt;1.0&lt;/Variant&gt;&lt;DescriptorVersion
type=&apos;string&apos;&gt;1.0&lt;/DescriptorVersion&gt;&lt;PartitionType
type=&apos;int&apos;&gt;-1&lt;/PartitionType&gt;&lt;RCP
type=&apos;int&apos;&gt;0&lt;/RCP&gt;&lt;/Common&gt;&lt;Connection&gt;&lt;URL
modified=&apos;1&apos;
type=&apos;string&apos;&gt;&lt;![CDATA[jdbc:snowflake://xyz.us-east-1.snowflakecomputing.com/?&amp;warehouse=#XYZ_DB.$snowflake_wh#&amp;db=#DB.$schema#]]&gt;&lt;/URL&gt;&lt;Username
modified=&apos;1&apos;
type=&apos;string&apos;&gt;&lt;![CDATA[#DB.$snowflake_userid#]]&gt;&lt;/Username&gt;&lt;Password
modified=&apos;1&apos;
type=&apos;string&apos;&gt;&lt;![CDATA[#DB.$snowflake_passwd#]]&gt;&lt;/Password&gt;&lt;Attributes
modified=&apos;1&apos;
type=&apos;string&apos;&gt;&lt;![CDATA[]]&gt;&lt;/Attributes&gt;&lt;/Connection&gt;&lt;Usage&gt;&lt;ReadMode
type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadMode&gt;&lt;GenerateSQL
modified=&apos;1&apos;
type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/GenerateSQL&gt;&lt;EnableQuotedIDs
type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/EnableQuotedIDs&gt;&lt;SQL&gt;&lt;SelectStatement
collapsed=&apos;1&apos; modified=&apos;1&apos;
type=&apos;string&apos;&gt;&lt;![CDATA[
Select 1 as Dummy from db.schema.table Limit 1;]]&gt;&lt;ReadFromFileSelect
type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileSelect&gt;&lt;/SelectStatement&gt;&lt;EnablePartitionedReads
type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/EnablePartitionedReads&gt;&lt;/SQL&gt;&lt;Transaction&gt;&lt;RecordCount
type=&apos;int&apos;&gt;&lt;![CDATA[2000]]&gt;&lt;/RecordCount&gt;&lt;IsolationLevel
type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/IsolationLevel&gt;&lt;AutocommitMode
modified=&apos;1&apos;
type=&apos;int&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/AutocommitMode&gt;&lt;EndOfWave
type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/EndOfWave&gt;&lt;BeginEnd
collapsed=&apos;1&apos;
type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/BeginEnd&gt;&lt;/Transaction&gt;&lt;Session&gt;&lt;ArraySize
type=&apos;int&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/ArraySize&gt;&lt;FetchSize
type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/FetchSize&gt;&lt;ReportSchemaMismatch
type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReportSchemaMismatch&gt;&lt;DefaultLengthForColumns
type=&apos;int&apos;&gt;&lt;![CDATA[200]]&gt;&lt;/DefaultLengthForColumns&gt;&lt;DefaultLengthForLongColumns
type=&apos;int&apos;&gt;&lt;![CDATA[20000]]&gt;&lt;/DefaultLengthForLongColumns&gt;&lt;CharacterSetForNonUnicodeColumns
collapsed=&apos;1&apos;
type=&apos;int&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/CharacterSetForNonUnicodeColumns&gt;&lt;KeepConductorConnectionAlive
type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/KeepConductorConnectionAlive&gt;&lt;/Session&gt;&lt;BeforeAfter
modified=&apos;1&apos; type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;BeforeSQL
collapsed=&apos;1&apos; modified=&apos;1&apos;
type=&apos;string&apos;&gt;&lt;![CDATA[]]&gt;&lt;ReadFromFileBeforeSQL
type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileBeforeSQL&gt;&lt;FailOnError
type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/FailOnError&gt;&lt;/BeforeSQL&gt;&lt;AfterSQL
collapsed=&apos;1&apos; modified=&apos;1&apos;
type=&apos;string&apos;&gt;&lt;![CDATA[
SELECT * FROM db.schema.table;
]]&gt;&lt;ReadFromFileAfterSQL
type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileAfterSQL&gt;&lt;FailOnError
type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/FailOnError&gt;&lt;/AfterSQL&gt;&lt;BeforeSQLNode
type=&apos;string&apos;&gt;&lt;![CDATA[]]&gt;&lt;ReadFromFileBeforeSQLNode
type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileBeforeSQLNode&gt;&lt;FailOnError
type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/FailOnError&gt;&lt;/BeforeSQLNode&gt;&lt;AfterSQLNode
type=&apos;string&apos;&gt;&lt;![CDATA[]]&gt;&lt;ReadFromFileAfterSQLNode
type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/ReadFromFileAfterSQLNode&gt;&lt;FailOnError
type=&apos;bool&apos;&gt;&lt;![CDATA[1]]&gt;&lt;/FailOnError&gt;&lt;/AfterSQLNode&gt;&lt;/BeforeAfter&gt;&lt;Java&gt;&lt;ConnectorClasspath
type=&apos;string&apos;&gt;&lt;![CDATA[$(DSHOME)/../DSComponents/bin/ccjdbc.jar;$(DSHOME)]]&gt;&lt;/ConnectorClasspath&gt;&lt;HeapSize
modified=&apos;1&apos;
type=&apos;int&apos;&gt;&lt;![CDATA[1024]]&gt;&lt;/HeapSize&gt;&lt;ConnectorOtherOptions
type=&apos;string&apos;&gt;&lt;![CDATA[-Dcom.ibm.is.cc.options=noisfjars]]&gt;&lt;/ConnectorOtherOptions&gt;&lt;/Java&gt;&lt;LimitRows
collapsed=&apos;1&apos;
type=&apos;bool&apos;&gt;&lt;![CDATA[0]]&gt;&lt;/LimitRows&gt;&lt;/Usage&gt;&lt;/Properties
&gt;</Property>
</SubRecord>
</Collection>
<Property Name="NextRecordID">0</Property>
</Record>
<Record Identifier="V253S1" Type="CustomStage" Readonly="0">
<Property Name="Name">Peek_SF_Ld</Property>
<Property Name="NextID">2</Property>
<Property Name="InputPins">V253S1P1</Property>
<Property Name="StageType">PxPeek</Property>
<Property Name="AllowColumnMapping">0</Property>
<Collection Name="Properties" Type="CustomProperty">
<SubRecord>
<Property Name="Name">all</Property>
<Property Name="Value">\(20)</Property>
</SubRecord>
<SubRecord>
<Property Name="Name">nrecs</Property>
<Property Name="Value">10</Property>
</SubRecord>
<SubRecord>
<Property Name="Name">dataset</Property>
<Property Name="Value">\(20)</Property>
</SubRecord>
<SubRecord>
<Property Name="Name">name</Property>
<Property Name="Value">name</Property>
</SubRecord>
<SubRecord>
<Property Name="Name">columns</Property>
<Property Name="Value">\(20)</Property>
</SubRecord>
<SubRecord>
<Property Name="Name">selection</Property>
<Property Name="Value">\(20)</Property>
</SubRecord>
</Collection>
<Property Name="NextRecordID">0</Property>
</Record>
</Job>
</DSExport>

0 comments on commit 7dc88e8

Please sign in to comment.