-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
96845cf
commit 2b6f72e
Showing
1 changed file
with
115 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
From 3e5925b3fc10a6e9de37e7c5156faddc711a9f13 Mon Sep 17 00:00:00 2001 | ||
From: Kostya Esmukov <[email protected]> | ||
Date: Sun, 14 Jul 2019 13:15:54 +0300 | ||
Subject: [PATCH] Add support for declarative dags provided by | ||
airflow-declarative project | ||
|
||
--- | ||
airflow/models/dagbag.py | 24 ++++++++++++++++++------ | ||
airflow/utils/file.py | 4 +++- | ||
airflow/www/views.py | 5 ++++- | ||
setup.cfg | 1 + | ||
4 files changed, 26 insertions(+), 8 deletions(-) | ||
|
||
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py | ||
index b37482aae..a96cbb9fd 100644 | ||
--- a/airflow/models/dagbag.py | ||
+++ b/airflow/models/dagbag.py | ||
@@ -24,6 +24,7 @@ import os | ||
import sys | ||
import textwrap | ||
import traceback | ||
+import types | ||
import warnings | ||
import zipfile | ||
from datetime import datetime, timedelta | ||
@@ -309,7 +310,7 @@ class DagBag(LoggingMixin): | ||
return [] | ||
|
||
self.log.debug("Importing %s", filepath) | ||
- org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1]) | ||
+ org_mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1]) | ||
path_hash = hashlib.sha1(filepath.encode('utf-8')).hexdigest() | ||
mod_name = f'unusual_prefix_{path_hash}_{org_mod_name}' | ||
|
||
@@ -319,11 +320,22 @@ class DagBag(LoggingMixin): | ||
timeout_msg = f"DagBag import timeout for {filepath} after {self.DAGBAG_IMPORT_TIMEOUT}s" | ||
with timeout(self.DAGBAG_IMPORT_TIMEOUT, error_message=timeout_msg): | ||
try: | ||
- loader = importlib.machinery.SourceFileLoader(mod_name, filepath) | ||
- spec = importlib.util.spec_from_loader(mod_name, loader) | ||
- new_module = importlib.util.module_from_spec(spec) | ||
- sys.modules[spec.name] = new_module | ||
- loader.exec_module(new_module) | ||
+ if file_ext in ('.yaml', '.yml'): | ||
+ # Avoid the possibility of cyclic imports error | ||
+ # by importing Declarative here, in a function: | ||
+ import airflow_declarative | ||
+ | ||
+ declarative_dags_list = airflow_declarative.from_path(filepath) | ||
+ new_module = types.ModuleType(mod_name) | ||
+ sys.modules[mod_name] = new_module | ||
+ for i, dag in enumerate(declarative_dags_list): | ||
+ setattr(new_module, "dag%s" % i, dag) | ||
+ else: | ||
+ loader = importlib.machinery.SourceFileLoader(mod_name, filepath) | ||
+ spec = importlib.util.spec_from_loader(mod_name, loader) | ||
+ new_module = importlib.util.module_from_spec(spec) | ||
+ sys.modules[spec.name] = new_module | ||
+ loader.exec_module(new_module) | ||
return [new_module] | ||
except Exception as e: | ||
self.log.exception("Failed to import: %s", filepath) | ||
diff --git a/airflow/utils/file.py b/airflow/utils/file.py | ||
index c02207951..dbfe455a9 100644 | ||
--- a/airflow/utils/file.py | ||
+++ b/airflow/utils/file.py | ||
@@ -189,7 +189,7 @@ def find_dag_file_paths(directory: Union[str, "pathlib.Path"], safe_mode: bool) | ||
if not os.path.isfile(file_path): | ||
continue | ||
_, file_ext = os.path.splitext(os.path.split(file_path)[-1]) | ||
- if file_ext != '.py' and not zipfile.is_zipfile(file_path): | ||
+ if file_ext not in ('.py', '.yaml', '.yml') and not zipfile.is_zipfile(file_path): | ||
continue | ||
if not might_contain_dag(file_path, safe_mode): | ||
continue | ||
@@ -215,6 +215,8 @@ def might_contain_dag(file_path: str, safe_mode: bool, zip_file: Optional[zipfil | ||
""" | ||
if not safe_mode: | ||
return True | ||
+ if not file_path.endswith(".py"): | ||
+ return True | ||
if zip_file: | ||
with zip_file.open(file_path) as current_file: | ||
content = current_file.read() | ||
diff --git a/airflow/www/views.py b/airflow/www/views.py | ||
index 830047bde..105d6a3e7 100644 | ||
--- a/airflow/www/views.py | ||
+++ b/airflow/www/views.py | ||
@@ -877,7 +877,10 @@ class Airflow(AirflowBaseView): | ||
dag_id = request.args.get('dag_id') | ||
dag_orm = DagModel.get_dagmodel(dag_id, session=session) | ||
code = DagCode.get_code_by_fileloc(dag_orm.fileloc) | ||
- html_code = Markup(highlight(code, lexers.PythonLexer(), HtmlFormatter(linenos=True))) | ||
+ if dag_orm.fileloc.endswith(('.yml', '.yaml')): | ||
+ html_code = Markup(highlight(code, lexers.YamlLexer(), HtmlFormatter(linenos=True))) | ||
+ else: | ||
+ html_code = Markup(highlight(code, lexers.PythonLexer(), HtmlFormatter(linenos=True))) | ||
|
||
except Exception as e: | ||
all_errors += ( | ||
diff --git a/setup.cfg b/setup.cfg | ||
index d3c5f574c..fc7e25241 100644 | ||
--- a/setup.cfg | ||
+++ b/setup.cfg | ||
@@ -77,6 +77,7 @@ setup_requires = | ||
# DEPENDENCIES_EPOCH_NUMBER in the Dockerfile.ci | ||
##################################################################################################### | ||
install_requires = | ||
+ airflow-declarative | ||
alembic>=1.2, <2.0 | ||
argcomplete~=1.10 | ||
attrs>=20.0, <21.0 | ||
-- | ||
2.33.0 | ||
|