From d42cd5e9d9f134101eeae9ddd19321ee9782c22f Mon Sep 17 00:00:00 2001 From: Paul Greenberg Date: Sat, 3 Aug 2019 00:36:19 -0400 Subject: [PATCH] add configuration management and authentication features Resolves: epoch8/airflow-exporter#54, epoch8/airflow-exporter#55, epoch8/airflow-exporter#56 More info: This commit adds the support for the configuration of the exporter via `prometheus_exporter` section of Airflow configuration. The keys with `expose_` prefix enable or disable the exposure of certain class of metrics. For example, Airflow variables-related metrics are not exposed by default. However, with `expose_variables` set to `True`, the exporter will start exposing the metrics. Similarly, by default, the exporter exposes scheduler-related metrics. However, with `expose_scheduler` set to `False`, the exporter will not export them. Additionally, when `expose_config` is being enabled, the exporter will expose a subset of `core` and `prometheus_exporter` configuration settings. ``` [prometheus_exporter] auth_enabled = True auth_token = 4e1dba1c-2b66-4275-b8ae-292ee9665fa1 expose_variables = True expose_config = False expose_scheduler = False ``` It is possible to disable the exporter: ``` [prometheus_exporter] disabled = True ``` When authentication is enabled, the metrics are accessible via: ```bash curl -v -H 'Authorization: Bearer 4e1dba1c-2b66-4275-b8ae-292ee9665fa1' https://localhost:8443/admin/metrics/ ``` Also, when the authentication is enabled, Prometheus scrape job might look like this: ```yaml - job_name: 'airflow_exporters' metrics_path: /admin/metrics/ scheme: https tls_config: insecure_skip_verify: true bearer_token: '4e1dba1c-2b66-4275-b8ae-292ee9665fa1' scrape_interval: 5m static_configs: - targets: - '127.0.0.1:8443' labels: environment: 'dev' relabel_configs: - source_labels: [__address__] regex: "^(.*):(.*)$" target_label: instance replacement: ${1} - source_labels: [instance] regex: "^(127.0.0.1)$" target_label: instance replacement: "airflow" ``` --- airflow_exporter/prometheus_exporter.py | 327 ++++++++++++++++-------- 1 file changed, 223 insertions(+), 104 deletions(-) diff --git a/airflow_exporter/prometheus_exporter.py b/airflow_exporter/prometheus_exporter.py index 45cbd81..fd08897 100644 --- a/airflow_exporter/prometheus_exporter.py +++ b/airflow_exporter/prometheus_exporter.py @@ -1,114 +1,86 @@ from sqlalchemy import func from sqlalchemy import text +from functools import wraps from flask import Response +from flask import request, abort from flask_admin import BaseView, expose -# Views for Flask App Builder -appbuilder_views = [] -try: - from flask_appbuilder import BaseView as FABBaseView, expose as FABexpose - class RBACMetrics(FABBaseView): - route_base = "/admin/metrics/" - @FABexpose('/') - def list(self): - return Response(generate_latest(), mimetype='text') - - - # Metrics View for Flask app builder used in airflow with rbac enabled - RBACmetricsView = { - "view": RBACMetrics(), - "name": "metrics", - "category": "Admin" - } - appbuilder_views = [RBACmetricsView] - -except ImportError: - pass - - from airflow.plugins_manager import AirflowPlugin from airflow.settings import Session from airflow.models import TaskInstance, DagModel, DagRun from airflow.utils.state import State -# Importing base classes that we need to derive -from prometheus_client import generate_latest, REGISTRY -from prometheus_client.core import GaugeMetricFamily - from contextlib import contextmanager +# Importing classes used in ExporterConfiguration +from collections import OrderedDict +from airflow import configuration +from airflow.utils.log.logging_mixin import LoggingMixin -@contextmanager -def session_scope(session): - """ - Provide a transactional scope around a series of operations. - """ - try: - yield session - session.commit() - except: - session.rollback() - raise - finally: - session.close() - - -def get_dag_state_info(): - '''get dag info - :return dag_info - ''' - with session_scope(Session) as session: - dag_status_query = session.query( - DagRun.dag_id, DagRun.state, func.count(DagRun.state).label('count') - ).group_by(DagRun.dag_id, DagRun.state).subquery() - return session.query( - dag_status_query.c.dag_id, dag_status_query.c.state, dag_status_query.c.count, - DagModel.owners - ).join(DagModel, DagModel.dag_id == dag_status_query.c.dag_id).all() - - -def get_task_state_info(): - '''get task info - :return task_info - ''' - with session_scope(Session) as session: - task_status_query = session.query( - TaskInstance.dag_id, TaskInstance.task_id, - TaskInstance.state, func.count(TaskInstance.dag_id).label('value') - ).group_by(TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.state).subquery() - return session.query( - task_status_query.c.dag_id, task_status_query.c.task_id, - task_status_query.c.state, task_status_query.c.value, DagModel.owners - ).join(DagModel, DagModel.dag_id == task_status_query.c.dag_id).all() - - -def get_dag_duration_info(): - '''get duration of currently running DagRuns - :return dag_info - ''' - driver = Session.bind.driver # pylint: disable=no-member - durations = { - 'pysqlite': func.sum( - (func.julianday(func.current_timestamp()) - func.julianday(DagRun.start_date)) * 86400.0 - ), - 'mysqldb': func.sum(func.timestampdiff(text('second'), DagRun.start_date, func.now())), - 'default': func.sum(func.now() - DagRun.start_date) - } - duration = durations.get(driver, durations['default']) - - with session_scope(Session) as session: - return session.query( - DagRun.dag_id, - DagRun.run_id, - duration.label('duration') - ).group_by( - DagRun.dag_id, - DagRun.run_id - ).filter( - DagRun.state == State.RUNNING - ).all() +from airflow import jobs + +# Importing Prometheus client classes +from prometheus_client import generate_latest, REGISTRY +from prometheus_client.core import GaugeMetricFamily +class ExporterConfiguration(object): + ''' Prometheus Exporter Configuration ''' + + def __init__(self): + self.enabled = True + self.log = LoggingMixin().log + self.name = 'prometheus_exporter' + self.config = { + 'auth_enabled': False, + 'auth_token': '' + } + self.exposed_metrics = [ + 'scheduler', + 'config' + ] + self.supported_metrics = [ + 'variables', + 'config', + 'scheduler', + ] + self._load_config() + return + + def _load_config(self): + if self.name not in configuration.conf: + return + params = configuration.conf[self.name] + if 'disabled' in params: + if params['disabled'] in ['True', True]: + self.enabled = False + self.log.info('%s is disabled') + return + if 'auth_enabled' in params: + if params['auth_enabled'] in ['True', True]: + self.config['auth_enabled'] = True + self.log.info('%s bearer token authentication is enabled', self.name) + if self.config['auth_enabled'] is True: + if 'auth_token' in params: + self.config['auth_token'] = str(params['auth_token']).strip() + else: + raise Exception(s, 'auth_enabled is True, but auth_token not found') + if self.config['auth_token'] == '': + raise Exception(s, 'auth_enabled is True, but auth_token is empty') + for k in params: + if k.startswith('expose_'): + metric = str(k.replace('expose_', '')) + if params[k] in [True, 'True']: + if metric not in self.exposed_metrics: + self.exposed_metrics.append(metric) + else: + if metric in self.exposed_metrics: + self.exposed_metrics.remove(metric) + for metric in self.exposed_metrics: + if metric not in self.supported_metrics: + raise Exception(s, 'metric %s is unsupported' % (metric)) + self.log.info('%s exposes %s metric', self.name, metric) + return class MetricsCollector(object): '''collection of metrics for prometheus''' @@ -120,7 +92,7 @@ def collect(self): '''collect metrics''' # Task metrics - task_info = get_task_state_info() + task_info = self.get_task_state_info() t_state = GaugeMetricFamily( 'airflow_task_status', 'Shows the number of task starts with this status', @@ -131,7 +103,7 @@ def collect(self): yield t_state # Dag Metrics - dag_info = get_dag_state_info() + dag_info = self.get_dag_state_info() d_state = GaugeMetricFamily( 'airflow_dag_status', 'Shows the number of dag starts with this status', @@ -147,26 +119,173 @@ def collect(self): 'Duration of currently running dag_runs in seconds', labels=['dag_id', 'run_id'] ) - driver = Session.bind.driver # pylint: disable=no-member - for dag in get_dag_duration_info(): + driver = Session.bind.driver + for dag in self.get_dag_duration_info(): if driver == 'mysqldb' or driver == 'pysqlite': dag_duration.add_metric([dag.dag_id, dag.run_id], dag.duration) else: dag_duration.add_metric([dag.dag_id, dag.run_id], dag.duration.seconds) yield dag_duration - -REGISTRY.register(MetricsCollector()) - + # Configuration metrics TODO + if 'config' in exporter.exposed_metrics: + pass + + # Variables metrics TODO + if 'variables' in exporter.exposed_metrics: + pass + + # Scheduler metrics + if 'scheduler' in exporter.exposed_metrics: + scheduler_up = GaugeMetricFamily( + 'airflow_scheduler_up', + 'Returns whether airflow scheduler is up (1) or down (0)', + labels=[] + ) + scheduler_status = self.get_scheduler_status() + scheduler_up.add_metric([], scheduler_status) + yield scheduler_up + + return + + + @contextmanager + def session_scope(self, session): + """ + Provide a transactional scope around a series of operations. + """ + try: + yield session + session.commit() + except: + session.rollback() + raise + finally: + session.close() + return + + + def get_dag_state_info(self): + '''get dag info + :return dag_info + ''' + with self.session_scope(Session) as session: + dag_status_query = session.query( + DagRun.dag_id, DagRun.state, func.count(DagRun.state).label('count') + ).group_by(DagRun.dag_id, DagRun.state).subquery() + return session.query( + dag_status_query.c.dag_id, dag_status_query.c.state, dag_status_query.c.count, + DagModel.owners + ).join(DagModel, DagModel.dag_id == dag_status_query.c.dag_id).all() + return + + + def get_task_state_info(self): + '''get task info + :return task_info + ''' + with self.session_scope(Session) as session: + task_status_query = session.query( + TaskInstance.dag_id, TaskInstance.task_id, + TaskInstance.state, func.count(TaskInstance.dag_id).label('value') + ).group_by(TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.state).subquery() + return session.query( + task_status_query.c.dag_id, task_status_query.c.task_id, + task_status_query.c.state, task_status_query.c.value, DagModel.owners + ).join(DagModel, DagModel.dag_id == task_status_query.c.dag_id).all() + return + + + def get_dag_duration_info(self): + '''get duration of currently running DagRuns + :return dag_info + ''' + driver = Session.bind.driver + durations = { + 'pysqlite': func.sum( + (func.julianday(func.current_timestamp()) - func.julianday(DagRun.start_date)) * 86400.0 + ), + 'mysqldb': func.sum(func.timestampdiff(text('second'), DagRun.start_date, func.now())), + 'default': func.sum(func.now() - DagRun.start_date) + } + duration = durations.get(driver, durations['default']) + + with self.session_scope(Session) as session: + return session.query( + DagRun.dag_id, + DagRun.run_id, + duration.label('duration') + ).group_by( + DagRun.dag_id, + DagRun.run_id + ).filter( + DagRun.state == State.RUNNING + ).all() + return + + def get_scheduler_status(self): + '''get scheduler status + :return scheduler_status + ''' + try: + scheduler_job = jobs.SchedulerJob.most_recent_job() + if scheduler_job: + latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat() + if scheduler_job.is_alive(): + return 1 + except: + pass + return 0 + + +def authenticate_scrape(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if not exporter.enabled: + abort(503) + if exporter.config['auth_enabled'] == False: + return f(*args, **kwargs) + auth_header = None + auth_headers = ['Authorization', 'X-Auth-Token', 'X-Token', 'x-token', 'access_token'] + for h in auth_headers: + if h in request.headers: + auth_header = h + break + if not auth_header: + abort(401) + data = request.headers[auth_header].encode('ascii','ignore') + token = str.replace(str(data.strip()), 'Bearer ','').replace('bearer ','') + if token != exporter.config['auth_token']: + abort(401) + return f(*args, **kwargs) + return decorated_function class Metrics(BaseView): @expose('/') + @authenticate_scrape def index(self): return Response(generate_latest(), mimetype='text/plain') +exporter = ExporterConfiguration() -ADMIN_VIEW = Metrics(category="Admin", name="Metrics") +from flask_appbuilder import BaseView as FABBaseView, expose as FABexpose +class RBACMetrics(FABBaseView): + route_base = "/admin/metrics/" + @FABexpose('/') + def list(self): + return Response(generate_latest(), mimetype='text') + +# Metrics View for Flask app builder used in airflow with RBAC enabled +RBACmetricsView = { + "view": RBACMetrics(), + "name": "metrics", + "category": "Admin" +} +appbuilder_views = [RBACmetricsView] +REGISTRY.register(MetricsCollector()) + +ADMIN_VIEW = Metrics(category="Admin", name="Metrics") class AirflowPrometheusPlugins(AirflowPlugin): '''plugin for show metrics'''