diff --git a/dev/dags/comparison/example_hackernews_dagfactory.yml b/dev/dags/comparison/example_hackernews_dagfactory.yml new file mode 100644 index 00000000..2cc082e8 --- /dev/null +++ b/dev/dags/comparison/example_hackernews_dagfactory.yml @@ -0,0 +1,19 @@ +example_hackernews_dagfactory: + default_args: + start_date: 2022-03-04 + tasks: + fetch_top_ten_news: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "curl -s https://hacker-news.firebaseio.com/v0/topstories.json | jq -c -r '.[0:10]'" + fetch_first_top_news: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[0]' | xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'" + dependencies: [fetch_top_ten_news] + fetch_second_top_news: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[1]' | xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'" + dependencies: [fetch_top_ten_news] + summarize: + operator: airflow.operators.python.PythonOperator + python_callable: hacker_news.summarize + dependencies: [fetch_first_top_news, fetch_second_top_news] diff --git a/dev/dags/comparison/example_hackernews_plain_airflow.py b/dev/dags/comparison/example_hackernews_plain_airflow.py new file mode 100644 index 00000000..f334f148 --- /dev/null +++ b/dev/dags/comparison/example_hackernews_plain_airflow.py @@ -0,0 +1,31 @@ +from datetime import datetime + +from airflow.models.dag import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python import PythonOperator +from hacker_news import summarize + +with DAG(dag_id="example_hackernews_plain_airflow", schedule=None, start_date=datetime(2022, 3, 4)) as dag: + + fetch_top_ten_news = BashOperator( + task_id="fetch_top_ten_news", + bash_command="curl -s https://hacker-news.firebaseio.com/v0/topstories.json | jq -c -r '.[0:10]'", + ) + + fetch_first_top_news = BashOperator( + task_id="fetch_first_top_news", + bash_command=""" + echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[0]' | xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json' + """, + ) + + fetch_second_top_news = BashOperator( + task_id="fetch_second_news", + bash_command=""" + echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[1]' | xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json' + """, + ) + + summarize = PythonOperator(task_id="summarize", python_callable=summarize) + + fetch_top_ten_news >> [fetch_first_top_news, fetch_second_top_news] >> summarize diff --git a/dev/dags/comparison/example_pypi_stats_dagfactory.yml b/dev/dags/comparison/example_pypi_stats_dagfactory.yml new file mode 100644 index 00000000..8738beab --- /dev/null +++ b/dev/dags/comparison/example_pypi_stats_dagfactory.yml @@ -0,0 +1,16 @@ +example_pypi_stats_dagfactory: + default_args: + start_date: 2022-03-04 + tasks: + get_pypi_projects_list: + decorator: airflow.decorators.task + python_callable: pypi_stats.get_pypi_projects_list + fetch_pypi_stats_data: + decorator: airflow.decorators.task + python_callable: pypi_stats.fetch_pypi_stats_data + expand: + package_name: +get_pypi_projects_list + summarize: + decorator: airflow.decorators.task + python_callable: pypi_stats.summarize + values: +fetch_pypi_stats_data diff --git a/dev/dags/comparison/example_pypi_stats_plain_airflow.py b/dev/dags/comparison/example_pypi_stats_plain_airflow.py new file mode 100644 index 00000000..57a6d784 --- /dev/null +++ b/dev/dags/comparison/example_pypi_stats_plain_airflow.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from airflow.decorators import task +from airflow.models.dag import DAG +from pypi_stats import fetch_pypi_stats_data, get_pypi_projects_list, summarize + +with DAG(dag_id="example_pypi_stats_plain_airflow", schedule=None, start_date=datetime(2022, 3, 4)) as dag: + + @task + def get_pypi_projects_list_(): + return get_pypi_projects_list() + + @task + def fetch_pypi_stats_data_(project_name: str): + return fetch_pypi_stats_data(project_name) + + @task + def summarize_(values: list[dict[str, Any]]): + return summarize(values) + + pypi_stats_data = fetch_pypi_stats_data_.expand(project_name=get_pypi_projects_list_()) + summarize_(pypi_stats_data) diff --git a/dev/dags/example_load_yaml_dags.py b/dev/dags/example_load_yaml_dags.py new file mode 100644 index 00000000..1963f637 --- /dev/null +++ b/dev/dags/example_load_yaml_dags.py @@ -0,0 +1,13 @@ +import os +from pathlib import Path + +from dagfactory import load_yaml_dags + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) +config_dir = str(CONFIG_ROOT_DIR / "comparison") + +load_yaml_dags( + globals_dict=globals(), + dags_folder=config_dir, +) diff --git a/dev/dags/hacker_news.py b/dev/dags/hacker_news.py new file mode 100644 index 00000000..169607ee --- /dev/null +++ b/dev/dags/hacker_news.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import json + +import pandas as pd + +# ----8<--- [ start: hacker_news ] + + +def summarize(**kwargs): + """ + Given the Airflow context is provided to this function, it will extract the XCom hackernews records from its + upstream tasks and summarise in Markdown. + """ + ti = kwargs["ti"] + upstream_task_ids = ti.task.upstream_task_ids # Get upstream task IDs dynamically + values = [json.loads(ti.xcom_pull(task_ids=task_id)) for task_id in upstream_task_ids] + + df = pd.DataFrame(values) + selected_columns = ["title", "url"] + df = df[selected_columns] + markdown_output = df.to_markdown(index=False) + print(markdown_output) + return markdown_output + + +# ----8<--- [ end: hacker_news ] diff --git a/dev/dags/pypi_stats.py b/dev/dags/pypi_stats.py new file mode 100644 index 00000000..067a8bd7 --- /dev/null +++ b/dev/dags/pypi_stats.py @@ -0,0 +1,66 @@ +""" +PyPI stats utility functions. +""" + +from __future__ import annotations + +from typing import Any + +import httpx +import pandas as pd + +DEFAULT_PYPI_PROJECTS = [ + "apache-airflow", + "dag-factory", + "astronomer-cosmos", +] + + +# ----8<--- [ start: pypi_stats ] + + +def get_pypi_projects_list(**kwargs: dict[str, Any]) -> list[str]: + """ + Return a list of PyPI project names to be analysed. + """ + projects_from_ui = kwargs.get("dag_run").conf.get("pypi_projects") if kwargs.get("dag_run") else None + if projects_from_ui is None: + pypi_projects = DEFAULT_PYPI_PROJECTS + else: + pypi_projects = projects_from_ui + return pypi_projects + + +def fetch_pypi_stats_data(package_name: str) -> dict[str, Any]: + """ + Given a PyPI project name, return the PyPI stats data associated to it. + """ + url = f"https://pypistats.org/api/packages/{package_name}/recent" + package_json = httpx.get(url).json() + package_data = package_json["data"] + package_data["package_name"] = package_name + return package_data + + +def summarize(values: list[dict[str, Any]]): + """ + Given a list with PyPI stats data, create a table summarizing it, sorting by the last day total downloads. + """ + df = pd.DataFrame(values) + first_column = "package_name" + sorted_columns = [first_column] + [col for col in df.columns if col != first_column] + df = df[sorted_columns].sort_values(by="last_day", ascending=False) + markdown_output = df.to_markdown(index=False) + print(markdown_output) + return markdown_output + + +# ----8<--- [ end: pypi_stats ] + +if __name__ == "__main__": + pypi_projects_list = get_pypi_projects_list() + all_data = [] + for pypi_project_name in pypi_projects_list: + project_data = fetch_pypi_stats_data(pypi_project_name) + all_data.append(project_data) + summarize(data=all_data) diff --git a/docs/comparison/index.md b/docs/comparison/index.md new file mode 100644 index 00000000..2b7e65f7 --- /dev/null +++ b/docs/comparison/index.md @@ -0,0 +1,11 @@ +# Using YAML instead of Python + +By default, Apache Airflow® users write their workflows, or sequences of tasks, in Python. + +DAG Factory offers an alternative interface, allowing users to represent Airflow workflows via YAML files, often using less code. + +This section illustrates a few examples of how to represent the same workflow using plain Airflow Python DAGs in comparison +to their representation using DAG Factory YAML files. + +* [Traditional Airflow Operators](traditional_operators.md) +* [TaskFlow API](traditional_operators.md) diff --git a/docs/comparison/taskflow_api.md b/docs/comparison/taskflow_api.md new file mode 100644 index 00000000..ca614dcd --- /dev/null +++ b/docs/comparison/taskflow_api.md @@ -0,0 +1,105 @@ +# TaskFlow API: Using YAML instead of Python + +For users that employ lots of Python functions in their DAGs, [TaskFlow API](https://www.astronomer.io/docs/learn/airflow-decorators/) represent a simpler way to transform functions into tasks, with a more intuitive way of passing data between them. +They were introduced in Airflow 2 as an alternative to Airflow [traditional operators](traditional_operators.md). + +Below, we illustrate how to represent an Airflow DAG using TaskFlow API and how to define the same DAG using +DAG Factory. Ultimately, both implementations use the same Airflow operators. The main difference is the language used +to declare the workflow: while one uses Python, the other uses YAML. + + +## Goal + +Let's say we'd like to create a workflow that performs the following: + +1. Create a list of [PyPI](https://pypi.org/) projects to be analysed +2. Fetch the [statistics](https://pypistats.org/) for each of these projects +3. Summarize the selected statistics as Markdown, using Python. + +We will implement all these steps using the Airflow `task` decorator, and the last task will generate a Markdown table similar to: + +``` +| package_name | last_day | last_month | last_week | +|:------------------|-----------:|-------------:|------------:| +| apache-airflow | 852242 | 28194255 | 6253861 | +| astronomer-cosmos | 442531 | 13354870 | 3127750 | +| dag-factory | 10078 | 354085 | 77752 | +``` + +The main logic is implemented as plain Python functions in [pypi_stats.py](https://github.com/astronomer/dag-factory/blob/main/dev/dags/pypi_stats.py): + +```title="pypi_stats.py" +--8<-- "dev/dags/pypi_stats.py:pypi_stats" +``` + + +## Implementation + +As a reference, the following workflows run using Airflow 2.10.2 and DAG Factory 0.21.0. + +### Plain Airflow Python DAG + +```title="example_pypi_stats_plain_airflow.py" +--8<-- "dev/dags/comparison/example_pypi_stats_plain_airflow.py" +``` + +### Alternative DAG Factory YAML + +```title="example_pypi_stats_dagfactory.yml" +--8<-- "dev/dags/comparison/example_pypi_stats_dagfactory.yml" +``` + + +## Comparison + +### Goal + +Both implementations accomplish the same goal and result in the expected Markdown table. + +### Airflow Graph view + +As it can be seen in the screenshots below, both the DAG created using Python with standard Airflow and the +DAG created using YAML and DAG Factory look identical, from a graph topology perspective, and also from the underlining +operators being used. + +#### Graph view: Plain Airflow Python DAG + +![alt text](../static/example_pypi_stats_plain_airflow_graph.png "Python DAG Graph visualisation") + +#### Graph view: Alternative DAG Factory YAML + +![alt text](../static/example_pypi_stats_dagfactory_graph.png "YAML DAG Graph visualization") + +### Airflow Dynamic Task Mapping + +In both workflows, we are generating dynamically a task for each PyPI repo. + +#### Mapped Tasks: Plain Airflow Python DAG + +![alt text](../static/example_pypi_stats_plain_airflow_mapped_tasks.png "Python DAG mapped tasks") + +#### Mapped Tasks: Alternative DAG Factory YAML + +![alt text](../static/example_pypi_stats_dagfactory_mapped_tasks.png "YAML DAG mapped tasks") + + +### Airflow Code view + +From an Airflow UI perspective, the content displayed in the "Code" view is the main difference between the two implementations. While Airflow renders the original Python DAG, as expected, in the case of the YAML DAGs, Airflow displays the Python file that references the DAG Factory YAML files: + +```title="example_load_yaml_dags.py" +--8<-- "dev/dags/example_load_yaml_dags.py" +``` + +#### Code view: Plain Airflow Python DAG + +![alt text](../static/example_pypi_stats_plain_airflow_code.png "Python DAG code visualization") + +#### Code view: Alternative DAG Factory YAML + +![alt text](../static/example_pypi_stats_dagfactory_code.png "YAML DAG code visualization") + +To overcome this limitation, DAG Factory appends the YAML content to the DAG Documentation so users can better troubleshoot +the DAG: + +![alt text](../static/example_pypi_stats_dagfactory_docs.png "YAML DAG docs visualization") diff --git a/docs/comparison/traditional_operators.md b/docs/comparison/traditional_operators.md new file mode 100644 index 00000000..c44290f2 --- /dev/null +++ b/docs/comparison/traditional_operators.md @@ -0,0 +1,95 @@ +# Traditional Operators: Using YAML instead of Python + +Traditionally, operators are Airflow's building blocks, and while they are robust and diverse, +they can sometimes lead to boilerplate-heavy DAGs compared to the newer [TaskFlow API](./taskflow_api.md). + +Most of the Airflow providers come with built-in traditional operators. Some examples include `BashOperator`, `PythonOperator`, `KubernetesPodOperator`, and `PostgresOperator`. + +Below, we illustrate how to represent an Airflow DAG using traditional operators and how to define the same DAG using +DAG Factory. Ultimately, both implementations use the same Airflow operators. The main difference is the language used +to declare the workflow: while one uses Python, the other uses YAML. + + +## Goal + +Let's say we'd like to create a workflow that performs the following: + +1. Retrieve the top ten stories from Hacker News using the [Hacker News API](https://github.com/HackerNews/API). +2. Fetch the details for the two top stories using the Hacker News API. +3. Summarize the selected stories as Markdown, using Python. + +We will implement the first two steps using `BashOperator` and the third step using `PythonOperator`. +The last task will generate a `Markdown` snippet similar to: + +``` +| title | url | +|:----------------------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------| +| I keep turning my Google Sheets into phone-friendly webapps | https://arstechnica.com/gadgets/2024/12/making-tiny-no-code-webapps-out-of-spreadsheets-is-a-weirdly-fulfilling-hobby/ | +| Coconut by Meta AI – Better LLM Reasoning with Chain of Continuous Thought? | https://aipapersacademy.com/chain-of-continuous-thought/ | +``` + +The main logic is implemented as plain Python functions in [hacker_news.py](https://github.com/astronomer/dag-factory/blob/main/dev/dags/hacker_news.py): + +```title="pypi_stats.py" +--8<-- "dev/dags/hacker_news.py:hacker_news" +``` + + + +## Implementation + +As a reference, the following workflows run using Airflow 2.10.2 and DAG Factory 0.21.0. + +### Plain Airflow Python DAG + +```title="example_hackernews_plain_airflow.py" +--8<-- "dev/dags/comparison/example_hackernews_plain_airflow.py" +``` + +### Alternative DAG Factory YAML + +```title="example_hackernews_dagfactory.py" +--8<-- "dev/dags/comparison/example_hackernews_dagfactory.yml" +``` + + +## Comparison + +### Goal + +Both implementations accomplish the same goal and result in the expected Markdown table. + +### Airflow Graph view + +As it can be seen in the screenshots below, both the DAG created using Python with standard Airflow and the +DAG created using YAML and DAG Factory look identical, from a graph topology perspective, and also from the underlining +operators being used. + +#### Graph view: Plain Airflow Python DAG + +![alt text](../static/example_hackernews_plain_airflow_graph.png "Python DAG Graph visualisation") + +#### Graph view: Alternative DAG Factory YAML + +![alt text](../static/example_hackernews_dagfactory_graph.png "YAML DAG Graph visualization") + +### Airflow Code view + +From an Airflow UI perspective, the content displayed in the "Code" view is the main difference between the two implementations. While Airflow renders the original Python DAG, as expected, in the case of the YAML DAGs, Airflow displays the Python file that references the DAG Factory YAML files: + +```title="example_load_yaml_dags.py" +--8<-- "dev/dags/example_load_yaml_dags.py" +``` + +#### Code view: Plain Airflow Python DAG + +![alt text](../static/example_hackernews_plain_airflow_code.png "Python DAG code visualization") + +#### Code view: Alternative DAG Factory YAML + +![alt text](../static/example_hackernews_dagfactory_code.png "YAML DAG code visualization") + +To overcome this limitation, DAG Factory appends the YAML content to the DAG Documentation so users can better troubleshoot +the DAG: + +![alt text](../static/example_hackernews_dagfactory_docs.png "YAML DAG docs visualization") diff --git a/docs/index.md b/docs/index.md index 550b54c9..f4b92754 100644 --- a/docs/index.md +++ b/docs/index.md @@ -10,9 +10,9 @@ Are you new to DAG Factory? This is the place to start! * [Quickstart with Airflow standalone](getting-started/quick-start-airflow-standalone.md) * [Quickstart with Astro CLI](getting-started/quick-start-astro-cli.md) * Install guide -* Using YAML instead of Python - * Traditional Airflow Operators] - * TaskFlow API +* [Using YAML instead of Python](./comparison/index.md) + * [Traditional Airflow Operators](./comparison/traditional_operators.md) + * [TaskFlow API](./comparison/taskflow_api.md) ## Features diff --git a/docs/static/example_hackernews_dagfactory_code.png b/docs/static/example_hackernews_dagfactory_code.png new file mode 100644 index 00000000..46d889d1 Binary files /dev/null and b/docs/static/example_hackernews_dagfactory_code.png differ diff --git a/docs/static/example_hackernews_dagfactory_docs.png b/docs/static/example_hackernews_dagfactory_docs.png new file mode 100644 index 00000000..270a3087 Binary files /dev/null and b/docs/static/example_hackernews_dagfactory_docs.png differ diff --git a/docs/static/example_hackernews_dagfactory_graph.png b/docs/static/example_hackernews_dagfactory_graph.png new file mode 100644 index 00000000..f9394140 Binary files /dev/null and b/docs/static/example_hackernews_dagfactory_graph.png differ diff --git a/docs/static/example_hackernews_plain_airflow_code.png b/docs/static/example_hackernews_plain_airflow_code.png new file mode 100644 index 00000000..8a0e9a0c Binary files /dev/null and b/docs/static/example_hackernews_plain_airflow_code.png differ diff --git a/docs/static/example_hackernews_plain_airflow_graph.png b/docs/static/example_hackernews_plain_airflow_graph.png new file mode 100644 index 00000000..9eccc065 Binary files /dev/null and b/docs/static/example_hackernews_plain_airflow_graph.png differ diff --git a/docs/static/example_pypi_stats_dagfactory_code.png b/docs/static/example_pypi_stats_dagfactory_code.png new file mode 100644 index 00000000..af0f09b7 Binary files /dev/null and b/docs/static/example_pypi_stats_dagfactory_code.png differ diff --git a/docs/static/example_pypi_stats_dagfactory_docs.png b/docs/static/example_pypi_stats_dagfactory_docs.png new file mode 100644 index 00000000..1e026ed0 Binary files /dev/null and b/docs/static/example_pypi_stats_dagfactory_docs.png differ diff --git a/docs/static/example_pypi_stats_dagfactory_graph.png b/docs/static/example_pypi_stats_dagfactory_graph.png new file mode 100644 index 00000000..865e8d9d Binary files /dev/null and b/docs/static/example_pypi_stats_dagfactory_graph.png differ diff --git a/docs/static/example_pypi_stats_dagfactory_mapped_tasks.png b/docs/static/example_pypi_stats_dagfactory_mapped_tasks.png new file mode 100644 index 00000000..335a99b2 Binary files /dev/null and b/docs/static/example_pypi_stats_dagfactory_mapped_tasks.png differ diff --git a/docs/static/example_pypi_stats_plain_airflow_code.png b/docs/static/example_pypi_stats_plain_airflow_code.png new file mode 100644 index 00000000..b980579d Binary files /dev/null and b/docs/static/example_pypi_stats_plain_airflow_code.png differ diff --git a/docs/static/example_pypi_stats_plain_airflow_graph.png b/docs/static/example_pypi_stats_plain_airflow_graph.png new file mode 100644 index 00000000..fc7ffe34 Binary files /dev/null and b/docs/static/example_pypi_stats_plain_airflow_graph.png differ diff --git a/docs/static/example_pypi_stats_plain_airflow_mapped_tasks.png b/docs/static/example_pypi_stats_plain_airflow_mapped_tasks.png new file mode 100644 index 00000000..5d3b419c Binary files /dev/null and b/docs/static/example_pypi_stats_plain_airflow_mapped_tasks.png differ diff --git a/pyproject.toml b/pyproject.toml index f9bab9c4..cb4c4418 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,8 @@ dependencies = [ "build", "dag-factory[tests]", "apache-airflow~={matrix:airflow}.0,!=2.9.0,!=2.9.1", # https://github.com/apache/airflow/pull/39670 - "httpx>=0.25.0" + "httpx>=0.25.0", + "pandas", ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index c4fb03e6..6215e2dd 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -10,6 +10,10 @@ ls $AIRFLOW_HOME airflow db check +# Necessary for overcoming the following issue with Airflow 2.2: +# ERROR: Cannot install apache-airflow==2.2.0, apache-airflow==2.2.1, apache-airflow==2.2.2, apache-airflow==2.2.3, apache-airflow==2.2.4, apache-airflow==2.2.5, httpx>=0.25.0 and tabulate>=0.9.0 because these package versions have conflicting dependencies. +pip install "tabulate>=0.9.0" + pytest -vv \ --cov=dagfactory \ --cov-report=term-missing \ diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index d2bb8414..9b3aaacf 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -24,6 +24,12 @@ MIN_VER_DAG_FILE_VER: dict[str, list[str]] = { # TaskFlow examples unrelated to dynamic task mapping work in earlier versions "2.3": ["example_dynamic_task_mapping.py", "example_taskflow.py"], + "2.5": [ + "example_pypi_stats_dagfactory", + "example_hackernews_dagfactory", + "example_hackernews_plain_airflow", + "example_pypi_stats_plain_airflow", + ], "2.7": ["example_map_index_template.py"], }