Skip to content

Commit

Permalink
Add docs comparing Python and YAML-based DAGs (#327)
Browse files Browse the repository at this point in the history
Add documentation illustrating how two Airflow Python DAGs (one using
traditional operators, another using TaskFlow API) are represented in
Python (standard Airflow) and YAML (DAG Factory).

We use real-world data: one DAG uses Hacker News, and the other uses
PyPI stats.

Before we merge this PR, I think we should merge #328 first and rebase
this one. I isolated the tooling changes (mkdocs and others) in PR #328.

Close: #319
  • Loading branch information
tatiana authored Jan 3, 2025
1 parent 177f608 commit 38555b1
Show file tree
Hide file tree
Showing 26 changed files with 423 additions and 4 deletions.
19 changes: 19 additions & 0 deletions dev/dags/comparison/example_hackernews_dagfactory.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
example_hackernews_dagfactory:
default_args:
start_date: 2022-03-04
tasks:
fetch_top_ten_news:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "curl -s https://hacker-news.firebaseio.com/v0/topstories.json | jq -c -r '.[0:10]'"
fetch_first_top_news:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[0]' | xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'"
dependencies: [fetch_top_ten_news]
fetch_second_top_news:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[1]' | xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'"
dependencies: [fetch_top_ten_news]
summarize:
operator: airflow.operators.python.PythonOperator
python_callable: hacker_news.summarize
dependencies: [fetch_first_top_news, fetch_second_top_news]
31 changes: 31 additions & 0 deletions dev/dags/comparison/example_hackernews_plain_airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import datetime

from airflow.models.dag import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python import PythonOperator
from hacker_news import summarize

with DAG(dag_id="example_hackernews_plain_airflow", schedule=None, start_date=datetime(2022, 3, 4)) as dag:

fetch_top_ten_news = BashOperator(
task_id="fetch_top_ten_news",
bash_command="curl -s https://hacker-news.firebaseio.com/v0/topstories.json | jq -c -r '.[0:10]'",
)

fetch_first_top_news = BashOperator(
task_id="fetch_first_top_news",
bash_command="""
echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[0]' | xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'
""",
)

fetch_second_top_news = BashOperator(
task_id="fetch_second_news",
bash_command="""
echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[1]' | xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'
""",
)

summarize = PythonOperator(task_id="summarize", python_callable=summarize)

fetch_top_ten_news >> [fetch_first_top_news, fetch_second_top_news] >> summarize
16 changes: 16 additions & 0 deletions dev/dags/comparison/example_pypi_stats_dagfactory.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
example_pypi_stats_dagfactory:
default_args:
start_date: 2022-03-04
tasks:
get_pypi_projects_list:
decorator: airflow.decorators.task
python_callable: pypi_stats.get_pypi_projects_list
fetch_pypi_stats_data:
decorator: airflow.decorators.task
python_callable: pypi_stats.fetch_pypi_stats_data
expand:
package_name: +get_pypi_projects_list
summarize:
decorator: airflow.decorators.task
python_callable: pypi_stats.summarize
values: +fetch_pypi_stats_data
25 changes: 25 additions & 0 deletions dev/dags/comparison/example_pypi_stats_plain_airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import annotations

from datetime import datetime
from typing import Any

from airflow.decorators import task
from airflow.models.dag import DAG
from pypi_stats import fetch_pypi_stats_data, get_pypi_projects_list, summarize

with DAG(dag_id="example_pypi_stats_plain_airflow", schedule=None, start_date=datetime(2022, 3, 4)) as dag:

@task
def get_pypi_projects_list_():
return get_pypi_projects_list()

@task
def fetch_pypi_stats_data_(project_name: str):
return fetch_pypi_stats_data(project_name)

@task
def summarize_(values: list[dict[str, Any]]):
return summarize(values)

pypi_stats_data = fetch_pypi_stats_data_.expand(project_name=get_pypi_projects_list_())
summarize_(pypi_stats_data)
13 changes: 13 additions & 0 deletions dev/dags/example_load_yaml_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os
from pathlib import Path

from dagfactory import load_yaml_dags

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))
config_dir = str(CONFIG_ROOT_DIR / "comparison")

load_yaml_dags(
globals_dict=globals(),
dags_folder=config_dir,
)
27 changes: 27 additions & 0 deletions dev/dags/hacker_news.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import annotations

import json

import pandas as pd

# ----8<--- [ start: hacker_news ]


def summarize(**kwargs):
"""
Given the Airflow context is provided to this function, it will extract the XCom hackernews records from its
upstream tasks and summarise in Markdown.
"""
ti = kwargs["ti"]
upstream_task_ids = ti.task.upstream_task_ids # Get upstream task IDs dynamically
values = [json.loads(ti.xcom_pull(task_ids=task_id)) for task_id in upstream_task_ids]

df = pd.DataFrame(values)
selected_columns = ["title", "url"]
df = df[selected_columns]
markdown_output = df.to_markdown(index=False)
print(markdown_output)
return markdown_output


# ----8<--- [ end: hacker_news ]
66 changes: 66 additions & 0 deletions dev/dags/pypi_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
PyPI stats utility functions.
"""

from __future__ import annotations

from typing import Any

import httpx
import pandas as pd

DEFAULT_PYPI_PROJECTS = [
"apache-airflow",
"dag-factory",
"astronomer-cosmos",
]


# ----8<--- [ start: pypi_stats ]


def get_pypi_projects_list(**kwargs: dict[str, Any]) -> list[str]:
"""
Return a list of PyPI project names to be analysed.
"""
projects_from_ui = kwargs.get("dag_run").conf.get("pypi_projects") if kwargs.get("dag_run") else None
if projects_from_ui is None:
pypi_projects = DEFAULT_PYPI_PROJECTS
else:
pypi_projects = projects_from_ui
return pypi_projects


def fetch_pypi_stats_data(package_name: str) -> dict[str, Any]:
"""
Given a PyPI project name, return the PyPI stats data associated to it.
"""
url = f"https://pypistats.org/api/packages/{package_name}/recent"
package_json = httpx.get(url).json()
package_data = package_json["data"]
package_data["package_name"] = package_name
return package_data


def summarize(values: list[dict[str, Any]]):
"""
Given a list with PyPI stats data, create a table summarizing it, sorting by the last day total downloads.
"""
df = pd.DataFrame(values)
first_column = "package_name"
sorted_columns = [first_column] + [col for col in df.columns if col != first_column]
df = df[sorted_columns].sort_values(by="last_day", ascending=False)
markdown_output = df.to_markdown(index=False)
print(markdown_output)
return markdown_output


# ----8<--- [ end: pypi_stats ]

if __name__ == "__main__":
pypi_projects_list = get_pypi_projects_list()
all_data = []
for pypi_project_name in pypi_projects_list:
project_data = fetch_pypi_stats_data(pypi_project_name)
all_data.append(project_data)
summarize(data=all_data)
11 changes: 11 additions & 0 deletions docs/comparison/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Using YAML instead of Python

By default, Apache Airflow® users write their workflows, or sequences of tasks, in Python.

DAG Factory offers an alternative interface, allowing users to represent Airflow workflows via YAML files, often using less code.

This section illustrates a few examples of how to represent the same workflow using plain Airflow Python DAGs in comparison
to their representation using DAG Factory YAML files.

* [Traditional Airflow Operators](traditional_operators.md)
* [TaskFlow API](traditional_operators.md)
105 changes: 105 additions & 0 deletions docs/comparison/taskflow_api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# TaskFlow API: Using YAML instead of Python

For users that employ lots of Python functions in their DAGs, [TaskFlow API](https://www.astronomer.io/docs/learn/airflow-decorators/) represent a simpler way to transform functions into tasks, with a more intuitive way of passing data between them.
They were introduced in Airflow 2 as an alternative to Airflow [traditional operators](traditional_operators.md).

Below, we illustrate how to represent an Airflow DAG using TaskFlow API and how to define the same DAG using
DAG Factory. Ultimately, both implementations use the same Airflow operators. The main difference is the language used
to declare the workflow: while one uses Python, the other uses YAML.


## Goal

Let's say we'd like to create a workflow that performs the following:

1. Create a list of [PyPI](https://pypi.org/) projects to be analysed
2. Fetch the [statistics](https://pypistats.org/) for each of these projects
3. Summarize the selected statistics as Markdown, using Python.

We will implement all these steps using the Airflow `task` decorator, and the last task will generate a Markdown table similar to:

```
| package_name | last_day | last_month | last_week |
|:------------------|-----------:|-------------:|------------:|
| apache-airflow | 852242 | 28194255 | 6253861 |
| astronomer-cosmos | 442531 | 13354870 | 3127750 |
| dag-factory | 10078 | 354085 | 77752 |
```

The main logic is implemented as plain Python functions in [pypi_stats.py](https://github.com/astronomer/dag-factory/blob/main/dev/dags/pypi_stats.py):

```title="pypi_stats.py"
--8<-- "dev/dags/pypi_stats.py:pypi_stats"
```


## Implementation

As a reference, the following workflows run using Airflow 2.10.2 and DAG Factory 0.21.0.

### Plain Airflow Python DAG

```title="example_pypi_stats_plain_airflow.py"
--8<-- "dev/dags/comparison/example_pypi_stats_plain_airflow.py"
```

### Alternative DAG Factory YAML

```title="example_pypi_stats_dagfactory.yml"
--8<-- "dev/dags/comparison/example_pypi_stats_dagfactory.yml"
```


## Comparison

### Goal

Both implementations accomplish the same goal and result in the expected Markdown table.

### Airflow Graph view

As it can be seen in the screenshots below, both the DAG created using Python with standard Airflow and the
DAG created using YAML and DAG Factory look identical, from a graph topology perspective, and also from the underlining
operators being used.

#### Graph view: Plain Airflow Python DAG

![alt text](../static/example_pypi_stats_plain_airflow_graph.png "Python DAG Graph visualisation")

#### Graph view: Alternative DAG Factory YAML

![alt text](../static/example_pypi_stats_dagfactory_graph.png "YAML DAG Graph visualization")

### Airflow Dynamic Task Mapping

In both workflows, we are generating dynamically a task for each PyPI repo.

#### Mapped Tasks: Plain Airflow Python DAG

![alt text](../static/example_pypi_stats_plain_airflow_mapped_tasks.png "Python DAG mapped tasks")

#### Mapped Tasks: Alternative DAG Factory YAML

![alt text](../static/example_pypi_stats_dagfactory_mapped_tasks.png "YAML DAG mapped tasks")


### Airflow Code view

From an Airflow UI perspective, the content displayed in the "Code" view is the main difference between the two implementations. While Airflow renders the original Python DAG, as expected, in the case of the YAML DAGs, Airflow displays the Python file that references the DAG Factory YAML files:

```title="example_load_yaml_dags.py"
--8<-- "dev/dags/example_load_yaml_dags.py"
```

#### Code view: Plain Airflow Python DAG

![alt text](../static/example_pypi_stats_plain_airflow_code.png "Python DAG code visualization")

#### Code view: Alternative DAG Factory YAML

![alt text](../static/example_pypi_stats_dagfactory_code.png "YAML DAG code visualization")

To overcome this limitation, DAG Factory appends the YAML content to the DAG Documentation so users can better troubleshoot
the DAG:

![alt text](../static/example_pypi_stats_dagfactory_docs.png "YAML DAG docs visualization")
Loading

0 comments on commit 38555b1

Please sign in to comment.