forked from Greenstand/treetracker-airflow-dags
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pre_request_reporting_card.py
71 lines (65 loc) · 2.37 KB
/
pre_request_reporting_card.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from datetime import datetime, timedelta
from textwrap import dedent
from pprint import pprint
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import psycopg2.extras
from lib.contracts_earnings_fcc import contract_earnings_fcc
from lib.pre_request import pre_request
from lib.planter_entity import planter_entity
from airflow.models import Variable
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
with DAG(
'pre_request_reporting_card',
default_args=default_args,
description='Rre-request the reporting card',
schedule_interval= '*/5 * * * *',
start_date=datetime(2021, 1, 1),
max_active_runs=1,
catchup=False,
tags=['reporting', 'freetown'],
) as dag:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
def pre_request_job(ds, **kwargs):
print("do pre request job:")
K8S_DOMAIN = Variable.get("K8S_DOMAIN")
# check if CKAN_DOMAIN exists
assert K8S_DOMAIN is not None
date = datetime.now().strftime("%Y-%m-%d")
# https://dev-k8s.treetracker.org/reporting/capture/statistics?capture_created_start_date=1970-01-01&capture_created_end_date=2022-05-07
url = f"https://{K8S_DOMAIN}/reporting/capture/statistics?capture_created_start_date=1970-01-01&capture_created_end_date={date}"
print(url)
pre_request(url)
return 1
pre_request_reporting_card = PythonOperator(
task_id='pre_request_reporting_card',
python_callable=pre_request_job,
)
pre_request_reporting_card >> t1