-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.py
76 lines (61 loc) · 2.08 KB
/
producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from robocorp import workitems
from robocorp.tasks import task
from RPA.HTTP import HTTP
from RPA.JSON import JSON
from RPA.Tables import Tables
http = HTTP()
json = JSON()
table = Tables()
TRAFFIC_JSON_FILE_PATH = "output/traffic.json"
# JSON data keys
COUNTRY_KEY = "SpatialDim"
YEAR_KEY = "TimeDim"
RATE_KEY = "NumericValue"
GENDER_KEY = "Dim1"
@task
def produce_traffic_data():
"""
Inhuman Insurance, Inc. Artificial Intelligence System automation.
Produces traffic data work items.
"""
http.download(
url="https://github.com/robocorp/inhuman-insurance-inc/raw/main/RS_198.json",
target_file=TRAFFIC_JSON_FILE_PATH,
overwrite=True,
)
traffic_data = load_traffic_data_as_table()
filtered_data = filter_and_sort_traffic_data(traffic_data)
filtered_data = get_latest_data_by_country(filtered_data)
payloads = create_work_item_payloads(filtered_data)
save_work_item_payloads(payloads)
def load_traffic_data_as_table():
json_data = json.load_json_from_file(TRAFFIC_JSON_FILE_PATH)
return table.create_table(json_data["value"])
def filter_and_sort_traffic_data(data):
max_rate = 5.0
both_genders = "BTSX"
table.filter_table_by_column(data, RATE_KEY, "<", max_rate)
table.filter_table_by_column(data, GENDER_KEY, "==", both_genders)
table.sort_table_by_column(data, YEAR_KEY, False)
return data
def get_latest_data_by_country(data):
data = table.group_table_by_column(data, COUNTRY_KEY)
latest_data_by_country = []
for group in data:
first_row = table.pop_table_row(group)
latest_data_by_country.append(first_row)
return latest_data_by_country
def create_work_item_payloads(traffic_data):
payloads = []
for row in traffic_data:
payload = dict(
country=row[COUNTRY_KEY],
year=row[YEAR_KEY],
rate=row[RATE_KEY]
)
payloads.append(payload)
return payloads
def save_work_item_payloads(payloads):
for payload in payloads:
variables = dict(traffic_data=payload)
workitems.outputs.create(variables)