Skip to content

Commit

Permalink
Add pause label for DAG status
Browse files Browse the repository at this point in the history
  • Loading branch information
danielcmessias committed Oct 18, 2021
1 parent df8b2d9 commit 9922a8a
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Labels:
* `dag_id`
* `owner`
* `status`
* `paused`

Value: number of dags in a specific status.

Expand All @@ -94,6 +95,7 @@ Labels:
* `dag_id`
* `owner`
* `status`
* `paused`

Value: 0 or 1 depending on wherever the current state of each `dag_id` is `status`.

Expand Down
11 changes: 8 additions & 3 deletions airflow_exporter/prometheus_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class DagStatusInfo:
dag_id: str
status: str
cnt: int
paused: str
owner: str

def get_dag_status_info() -> List[DagStatusInfo]:
Expand All @@ -43,7 +44,7 @@ def get_dag_status_info() -> List[DagStatusInfo]:
sql_res = (
Session.query( # pylint: disable=no-member
dag_status_query.c.dag_id, dag_status_query.c.state, dag_status_query.c.cnt,
DagModel.owners
DagModel.is_paused, DagModel.owners
)
.join(DagModel, DagModel.dag_id == dag_status_query.c.dag_id)
.join(SerializedDagModel, SerializedDagModel.dag_id == dag_status_query.c.dag_id)
Expand All @@ -55,6 +56,7 @@ def get_dag_status_info() -> List[DagStatusInfo]:
dag_id = i.dag_id,
status = i.state,
cnt = i.cnt,
paused = str(i.is_paused).lower(),
owner = i.owners
)
for i in sql_res
Expand All @@ -78,7 +80,7 @@ def get_last_dagrun_info() -> List[DagStatusInfo]:
sql_res = (
Session.query(
last_dagrun_query.c.dag_id, last_dagrun_query.c.state, last_dagrun_query.c.row_number,
DagModel.owners
DagModel.is_paused, DagModel.owners
)
.filter(last_dagrun_query.c.row_number == 1)
.join(DagModel, DagModel.dag_id == last_dagrun_query.c.dag_id)
Expand All @@ -90,6 +92,7 @@ def get_last_dagrun_info() -> List[DagStatusInfo]:
DagStatusInfo(
dag_id = i.dag_id,
status = i.state,
paused = str(i.is_paused).lower(),
cnt = 1,
owner = i.owners
)
Expand Down Expand Up @@ -238,6 +241,7 @@ def collect(self) -> Generator[Metric, None, None]:
'dag_id': dag.dag_id,
'owner': dag.owner,
'status': dag.status,
'paused': dag.paused,
**labels
},
dag.cnt,
Expand All @@ -264,6 +268,7 @@ def collect(self) -> Generator[Metric, None, None]:
'dag_id': dag.dag_id,
'owner': dag.owner,
'status': status,
'paused': dag.paused,
**labels
},
int(dag.status == status)
Expand Down Expand Up @@ -345,4 +350,4 @@ class AirflowPrometheusPlugins(AirflowPlugin):
flask_blueprints = [] # type: ignore
menu_links = [] # type: ignore
appbuilder_views = [RBACmetricsView]
appbuilder_menu_items = [] # type: ignore
appbuilder_menu_items = [] # type: ignore

0 comments on commit 9922a8a

Please sign in to comment.