Skip to content

Commit

Permalink
add configuration management and authentication features
Browse files Browse the repository at this point in the history
Resolves: epoch8#54, epoch8#55, epoch8#56

More info:

This commit adds the support for the configuration of the exporter via
`prometheus_exporter` section of Airflow configuration.

The keys with `expose_` prefix enable or disable the exposure of certain class
of metrics. For example, Airflow variables-related metrics are not exposed by
default. However, with `expose_variables` set to `True`, the exporter will start
exposing the metrics. Similarly, by default, the exporter exposes scheduler-related
metrics. However, with `expose_scheduler` set to `False`, the exporter will
not export them.

Additionally, when `expose_config` is being enabled, the exporter will expose
a subset of `core` and `prometheus_exporter` configuration settings.

```
[prometheus_exporter]
auth_enabled = True
auth_token = 4e1dba1c-2b66-4275-b8ae-292ee9665fa1
expose_variables = True
expose_config = False
expose_scheduler = False
```

It is possible to disable the exporter:

```
[prometheus_exporter]
disabled = True
```

When authentication is enabled, the metrics are accessible via:

```bash
curl -v -H 'Authorization: Bearer 4e1dba1c-2b66-4275-b8ae-292ee9665fa1' https://localhost:8443/admin/metrics/
```

Also, when the authentication is enabled, Prometheus scrape job
might look like this:

```yaml
  - job_name: 'airflow_exporters'
    metrics_path: /admin/metrics/
    scheme: https
    tls_config:
      insecure_skip_verify: true
    bearer_token: '4e1dba1c-2b66-4275-b8ae-292ee9665fa1'
    scrape_interval: 5m
    static_configs:
    - targets:
      - '127.0.0.1:8443'
      labels:
        environment: 'dev'
    relabel_configs:
    - source_labels: [__address__]
      regex: "^(.*):(.*)$"
      target_label: instance
      replacement: ${1}
    - source_labels: [instance]
      regex: "^(127.0.0.1)$"
      target_label: instance
      replacement: "airflow"
```
  • Loading branch information
greenpau committed Aug 3, 2019
1 parent ebaf763 commit d42cd5e
Showing 1 changed file with 223 additions and 104 deletions.
327 changes: 223 additions & 104 deletions airflow_exporter/prometheus_exporter.py
Original file line number Diff line number Diff line change
@@ -1,114 +1,86 @@
from sqlalchemy import func
from sqlalchemy import text

from functools import wraps
from flask import Response
from flask import request, abort
from flask_admin import BaseView, expose

# Views for Flask App Builder
appbuilder_views = []
try:
from flask_appbuilder import BaseView as FABBaseView, expose as FABexpose
class RBACMetrics(FABBaseView):
route_base = "/admin/metrics/"
@FABexpose('/')
def list(self):
return Response(generate_latest(), mimetype='text')


# Metrics View for Flask app builder used in airflow with rbac enabled
RBACmetricsView = {
"view": RBACMetrics(),
"name": "metrics",
"category": "Admin"
}
appbuilder_views = [RBACmetricsView]

except ImportError:
pass


from airflow.plugins_manager import AirflowPlugin
from airflow.settings import Session
from airflow.models import TaskInstance, DagModel, DagRun
from airflow.utils.state import State

# Importing base classes that we need to derive
from prometheus_client import generate_latest, REGISTRY
from prometheus_client.core import GaugeMetricFamily

from contextlib import contextmanager

# Importing classes used in ExporterConfiguration
from collections import OrderedDict
from airflow import configuration
from airflow.utils.log.logging_mixin import LoggingMixin

@contextmanager
def session_scope(session):
"""
Provide a transactional scope around a series of operations.
"""
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()


def get_dag_state_info():
'''get dag info
:return dag_info
'''
with session_scope(Session) as session:
dag_status_query = session.query(
DagRun.dag_id, DagRun.state, func.count(DagRun.state).label('count')
).group_by(DagRun.dag_id, DagRun.state).subquery()
return session.query(
dag_status_query.c.dag_id, dag_status_query.c.state, dag_status_query.c.count,
DagModel.owners
).join(DagModel, DagModel.dag_id == dag_status_query.c.dag_id).all()


def get_task_state_info():
'''get task info
:return task_info
'''
with session_scope(Session) as session:
task_status_query = session.query(
TaskInstance.dag_id, TaskInstance.task_id,
TaskInstance.state, func.count(TaskInstance.dag_id).label('value')
).group_by(TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.state).subquery()
return session.query(
task_status_query.c.dag_id, task_status_query.c.task_id,
task_status_query.c.state, task_status_query.c.value, DagModel.owners
).join(DagModel, DagModel.dag_id == task_status_query.c.dag_id).all()


def get_dag_duration_info():
'''get duration of currently running DagRuns
:return dag_info
'''
driver = Session.bind.driver # pylint: disable=no-member
durations = {
'pysqlite': func.sum(
(func.julianday(func.current_timestamp()) - func.julianday(DagRun.start_date)) * 86400.0
),
'mysqldb': func.sum(func.timestampdiff(text('second'), DagRun.start_date, func.now())),
'default': func.sum(func.now() - DagRun.start_date)
}
duration = durations.get(driver, durations['default'])

with session_scope(Session) as session:
return session.query(
DagRun.dag_id,
DagRun.run_id,
duration.label('duration')
).group_by(
DagRun.dag_id,
DagRun.run_id
).filter(
DagRun.state == State.RUNNING
).all()
from airflow import jobs

# Importing Prometheus client classes
from prometheus_client import generate_latest, REGISTRY
from prometheus_client.core import GaugeMetricFamily

class ExporterConfiguration(object):
''' Prometheus Exporter Configuration '''

def __init__(self):
self.enabled = True
self.log = LoggingMixin().log
self.name = 'prometheus_exporter'
self.config = {
'auth_enabled': False,
'auth_token': ''
}
self.exposed_metrics = [
'scheduler',
'config'
]
self.supported_metrics = [
'variables',
'config',
'scheduler',
]
self._load_config()
return

def _load_config(self):
if self.name not in configuration.conf:
return
params = configuration.conf[self.name]
if 'disabled' in params:
if params['disabled'] in ['True', True]:
self.enabled = False
self.log.info('%s is disabled')
return
if 'auth_enabled' in params:
if params['auth_enabled'] in ['True', True]:
self.config['auth_enabled'] = True
self.log.info('%s bearer token authentication is enabled', self.name)
if self.config['auth_enabled'] is True:
if 'auth_token' in params:
self.config['auth_token'] = str(params['auth_token']).strip()
else:
raise Exception(s, 'auth_enabled is True, but auth_token not found')
if self.config['auth_token'] == '':
raise Exception(s, 'auth_enabled is True, but auth_token is empty')
for k in params:
if k.startswith('expose_'):
metric = str(k.replace('expose_', ''))
if params[k] in [True, 'True']:
if metric not in self.exposed_metrics:
self.exposed_metrics.append(metric)
else:
if metric in self.exposed_metrics:
self.exposed_metrics.remove(metric)
for metric in self.exposed_metrics:
if metric not in self.supported_metrics:
raise Exception(s, 'metric %s is unsupported' % (metric))
self.log.info('%s exposes %s metric', self.name, metric)
return

class MetricsCollector(object):
'''collection of metrics for prometheus'''
Expand All @@ -120,7 +92,7 @@ def collect(self):
'''collect metrics'''

# Task metrics
task_info = get_task_state_info()
task_info = self.get_task_state_info()
t_state = GaugeMetricFamily(
'airflow_task_status',
'Shows the number of task starts with this status',
Expand All @@ -131,7 +103,7 @@ def collect(self):
yield t_state

# Dag Metrics
dag_info = get_dag_state_info()
dag_info = self.get_dag_state_info()
d_state = GaugeMetricFamily(
'airflow_dag_status',
'Shows the number of dag starts with this status',
Expand All @@ -147,26 +119,173 @@ def collect(self):
'Duration of currently running dag_runs in seconds',
labels=['dag_id', 'run_id']
)
driver = Session.bind.driver # pylint: disable=no-member
for dag in get_dag_duration_info():
driver = Session.bind.driver
for dag in self.get_dag_duration_info():
if driver == 'mysqldb' or driver == 'pysqlite':
dag_duration.add_metric([dag.dag_id, dag.run_id], dag.duration)
else:
dag_duration.add_metric([dag.dag_id, dag.run_id], dag.duration.seconds)
yield dag_duration


REGISTRY.register(MetricsCollector())

# Configuration metrics TODO
if 'config' in exporter.exposed_metrics:
pass

# Variables metrics TODO
if 'variables' in exporter.exposed_metrics:
pass

# Scheduler metrics
if 'scheduler' in exporter.exposed_metrics:
scheduler_up = GaugeMetricFamily(
'airflow_scheduler_up',
'Returns whether airflow scheduler is up (1) or down (0)',
labels=[]
)
scheduler_status = self.get_scheduler_status()
scheduler_up.add_metric([], scheduler_status)
yield scheduler_up

return


@contextmanager
def session_scope(self, session):
"""
Provide a transactional scope around a series of operations.
"""
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
return


def get_dag_state_info(self):
'''get dag info
:return dag_info
'''
with self.session_scope(Session) as session:
dag_status_query = session.query(
DagRun.dag_id, DagRun.state, func.count(DagRun.state).label('count')
).group_by(DagRun.dag_id, DagRun.state).subquery()
return session.query(
dag_status_query.c.dag_id, dag_status_query.c.state, dag_status_query.c.count,
DagModel.owners
).join(DagModel, DagModel.dag_id == dag_status_query.c.dag_id).all()
return


def get_task_state_info(self):
'''get task info
:return task_info
'''
with self.session_scope(Session) as session:
task_status_query = session.query(
TaskInstance.dag_id, TaskInstance.task_id,
TaskInstance.state, func.count(TaskInstance.dag_id).label('value')
).group_by(TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.state).subquery()
return session.query(
task_status_query.c.dag_id, task_status_query.c.task_id,
task_status_query.c.state, task_status_query.c.value, DagModel.owners
).join(DagModel, DagModel.dag_id == task_status_query.c.dag_id).all()
return


def get_dag_duration_info(self):
'''get duration of currently running DagRuns
:return dag_info
'''
driver = Session.bind.driver
durations = {
'pysqlite': func.sum(
(func.julianday(func.current_timestamp()) - func.julianday(DagRun.start_date)) * 86400.0
),
'mysqldb': func.sum(func.timestampdiff(text('second'), DagRun.start_date, func.now())),
'default': func.sum(func.now() - DagRun.start_date)
}
duration = durations.get(driver, durations['default'])

with self.session_scope(Session) as session:
return session.query(
DagRun.dag_id,
DagRun.run_id,
duration.label('duration')
).group_by(
DagRun.dag_id,
DagRun.run_id
).filter(
DagRun.state == State.RUNNING
).all()
return

def get_scheduler_status(self):
'''get scheduler status
:return scheduler_status
'''
try:
scheduler_job = jobs.SchedulerJob.most_recent_job()
if scheduler_job:
latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat()
if scheduler_job.is_alive():
return 1
except:
pass
return 0


def authenticate_scrape(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not exporter.enabled:
abort(503)
if exporter.config['auth_enabled'] == False:
return f(*args, **kwargs)
auth_header = None
auth_headers = ['Authorization', 'X-Auth-Token', 'X-Token', 'x-token', 'access_token']
for h in auth_headers:
if h in request.headers:
auth_header = h
break
if not auth_header:
abort(401)
data = request.headers[auth_header].encode('ascii','ignore')
token = str.replace(str(data.strip()), 'Bearer ','').replace('bearer ','')
if token != exporter.config['auth_token']:
abort(401)
return f(*args, **kwargs)
return decorated_function

class Metrics(BaseView):
@expose('/')
@authenticate_scrape
def index(self):
return Response(generate_latest(), mimetype='text/plain')

exporter = ExporterConfiguration()

ADMIN_VIEW = Metrics(category="Admin", name="Metrics")
from flask_appbuilder import BaseView as FABBaseView, expose as FABexpose
class RBACMetrics(FABBaseView):
route_base = "/admin/metrics/"
@FABexpose('/')
def list(self):
return Response(generate_latest(), mimetype='text')

# Metrics View for Flask app builder used in airflow with RBAC enabled
RBACmetricsView = {
"view": RBACMetrics(),
"name": "metrics",
"category": "Admin"
}
appbuilder_views = [RBACmetricsView]

REGISTRY.register(MetricsCollector())

ADMIN_VIEW = Metrics(category="Admin", name="Metrics")

class AirflowPrometheusPlugins(AirflowPlugin):
'''plugin for show metrics'''
Expand Down

0 comments on commit d42cd5e

Please sign in to comment.