From 9922a8ae2005e9be77450db4121dbdd9cc2715cc Mon Sep 17 00:00:00 2001 From: Daniel Messias Date: Mon, 18 Oct 2021 15:12:06 +0100 Subject: [PATCH] Add pause label for DAG status --- README.md | 2 ++ airflow_exporter/prometheus_exporter.py | 11 ++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c96d6c6..e0de85e 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ Labels: * `dag_id` * `owner` * `status` +* `paused` Value: number of dags in a specific status. @@ -94,6 +95,7 @@ Labels: * `dag_id` * `owner` * `status` +* `paused` Value: 0 or 1 depending on wherever the current state of each `dag_id` is `status`. diff --git a/airflow_exporter/prometheus_exporter.py b/airflow_exporter/prometheus_exporter.py index a05ed31..d7edf50 100644 --- a/airflow_exporter/prometheus_exporter.py +++ b/airflow_exporter/prometheus_exporter.py @@ -28,6 +28,7 @@ class DagStatusInfo: dag_id: str status: str cnt: int + paused: str owner: str def get_dag_status_info() -> List[DagStatusInfo]: @@ -43,7 +44,7 @@ def get_dag_status_info() -> List[DagStatusInfo]: sql_res = ( Session.query( # pylint: disable=no-member dag_status_query.c.dag_id, dag_status_query.c.state, dag_status_query.c.cnt, - DagModel.owners + DagModel.is_paused, DagModel.owners ) .join(DagModel, DagModel.dag_id == dag_status_query.c.dag_id) .join(SerializedDagModel, SerializedDagModel.dag_id == dag_status_query.c.dag_id) @@ -55,6 +56,7 @@ def get_dag_status_info() -> List[DagStatusInfo]: dag_id = i.dag_id, status = i.state, cnt = i.cnt, + paused = str(i.is_paused).lower(), owner = i.owners ) for i in sql_res @@ -78,7 +80,7 @@ def get_last_dagrun_info() -> List[DagStatusInfo]: sql_res = ( Session.query( last_dagrun_query.c.dag_id, last_dagrun_query.c.state, last_dagrun_query.c.row_number, - DagModel.owners + DagModel.is_paused, DagModel.owners ) .filter(last_dagrun_query.c.row_number == 1) .join(DagModel, DagModel.dag_id == last_dagrun_query.c.dag_id) @@ -90,6 +92,7 @@ def get_last_dagrun_info() -> List[DagStatusInfo]: DagStatusInfo( dag_id = i.dag_id, status = i.state, + paused = str(i.is_paused).lower(), cnt = 1, owner = i.owners ) @@ -238,6 +241,7 @@ def collect(self) -> Generator[Metric, None, None]: 'dag_id': dag.dag_id, 'owner': dag.owner, 'status': dag.status, + 'paused': dag.paused, **labels }, dag.cnt, @@ -264,6 +268,7 @@ def collect(self) -> Generator[Metric, None, None]: 'dag_id': dag.dag_id, 'owner': dag.owner, 'status': status, + 'paused': dag.paused, **labels }, int(dag.status == status) @@ -345,4 +350,4 @@ class AirflowPrometheusPlugins(AirflowPlugin): flask_blueprints = [] # type: ignore menu_links = [] # type: ignore appbuilder_views = [RBACmetricsView] - appbuilder_menu_items = [] # type: ignore \ No newline at end of file + appbuilder_menu_items = [] # type: ignore