forked from IATI/IATI-Dashboard
-
Notifications
You must be signed in to change notification settings - Fork 1
/
data.py
296 lines (244 loc) · 12.1 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
from collections import OrderedDict
from collections.abc import MutableMapping
import json
import os
import re
import csv
from decimal import Decimal
from xmlschema import XMLSchema
# Modified from:
# https://github.com/IATI/IATI-Stats/blob/1d20ed1e/stats/common/decorators.py#L5-L13
def memoize(f):
def wrapper(self, key):
if not hasattr(self, '__cache'):
self.__cache = {}
if key in self.__cache:
return self.__cache[key]
res = f(self, key)
if type(res) is not JSONDir:
# don't cache JSONDirs
self.__cache[key] = res
return res
return wrapper
class JSONDir(MutableMapping):
"""Produces an object, to be used to access JSON-formatted publisher data and return
this as an ordered dictionary (with nested dictionaries, if appropriate).
Use of this class removes the need to load large amounts of data into memory.
"""
def __init__(self, folder):
"""Set the path of the folder being accessed as an attribute to an instance of
the object.
"""
self.folder = folder
def __len__(self):
return len(self.keys())
def __delitem__(self, key):
try:
del self.folder[key]
except KeyError:
pass
def __repr__(self):
return '{}, JSONDIR({})'.format(super(JSONDir, self).__repr__(), self.__dict__)
def __setitem__(self, key, value):
super(JSONDir, self).__setitem__(key, value)
@memoize
def __getitem__(self, key):
"""Define how variables are gathered from the raw JSON files and then parsed into
the OrderedDict that will be returned.
Note:
try-except should be used around file operations rather than checking before-hand
"""
if os.path.exists(os.path.join(self.folder, key)):
# The data being sought is a directory
data = JSONDir(os.path.join(self.folder, key))
elif os.path.exists(os.path.join(self.folder, key + '.json')):
# The data being sought is a json file
with open(os.path.join(self.folder, key + '.json')) as fp:
data = json.load(fp, object_pairs_hook=OrderedDict)
# Deal with publishers who had an old registry ID
# If this publisher had at least one old ID in the past
if (self.get_publisher_name() in get_registry_id_matches().values()) and ('gitaggregate' in self.folder):
# Perform the merging
# Look over the set of changed registry IDs
for previous_id, current_id in get_registry_id_matches().items():
folder = self.folder
previous_path = os.path.join(folder.replace(current_id, previous_id), key + '.json')
# If this publisher has had an old ID and there is data for it
if (current_id == self.get_publisher_name()) and os.path.exists(previous_path):
# Get the corresponding value for the old publisher ID, and merge with the existing value for this publisher
with open(previous_path) as old_fp:
old_pub_data = json.load(old_fp, object_pairs_hook=OrderedDict)
deep_merge(data, old_pub_data)
# FIXME i) Should deep_merge attempt to sort this ordereddict ii) Should there be an attempt to aggregate/average conflicting values?
else:
# No value found as either a folder or json file
raise KeyError(key)
return data
def keys(self):
"""Method to return a list of keys that are contained within the data folder that
is being accessed within this instance.
"""
return [x[:-5] if x.endswith('.json') else x for x in os.listdir(self.folder)]
def __iter__(self):
"""Custom iterable, to iterate over the keys that are contained within the data
folder that is being accessed within this instance.
"""
return iter(self.keys())
def get_publisher_name(self):
"""Find the name of the publisher that this data relates to.
Note, this is a super hacky way to do this, prize available if a better way is found to do this!
"""
# Get a list of the parts that are contained within this filepath
path = os.path.normpath(self.folder)
path_components = path.split(os.sep)
# Loop over this list and return the publisher name if it is found within the historic list of publishers
for x in path_components:
if x in JSONDir('./stats-calculated/current/aggregated-publisher').keys():
return x
# If got to the end of the loop and nothing found, this folder does not relate to a single publisher
return None
def get_publisher_stats(publisher, stats_type='aggregated'):
"""Function to obtain current data for a given publisher.
Returns: A JSONDir object for the publisher, or an empty dictionary if the publisher
is not found.
"""
try:
return JSONDir('./stats-calculated/current/{0}-publisher/{1}'.format(stats_type, publisher))
except IOError:
return {}
def get_registry_id_matches():
"""Returns a dictionary of publishers who have modified their registry ID
Returns: Dictionary, where the key is the old registry ID, and the corresponding
value is the registry ID that data should be mapped to
"""
# Load registry IDs for publishers who have changed their registry ID
with open('registry_id_relationships.csv') as f:
reader = csv.DictReader(f)
# Load this data into a dictonary
registry_matches = {
row['previous_registry_id']: row['current_registry_id']
for row in reader
}
return registry_matches
def deep_merge(obj1, obj2):
"""Merges two OrderedDict objects with an unknown number of nested levels
Input: obj1 - OrderedDict to be used as the base object
Input: obj2 - OrderedDict to be merged into obj1
Returns: Nothing, but obj1 will contain the full data
"""
# Iterate through keys
for key in obj1.copy():
# If this is value, we've hit the bottom, copy all of obj2 into obj1
if type(obj1[key]) is not OrderedDict:
for key2 in obj2:
# If there exists a dict at that key, make sure it's not erased
if key2 in obj1:
if type(obj1[key2]) is not OrderedDict:
# You can change behavior here to determine
# How duplicate keys are handled
obj1[key2] = obj2[key2]
else:
obj1[key2] = obj2[key2]
# If it's a dictionary we need to go deeper, by running this function recursively
else:
if key in obj2:
deep_merge(obj1[key], obj2[key])
current_stats = {
'aggregated': JSONDir('./stats-calculated/current/aggregated'),
'aggregated_file': JSONDir('./stats-calculated/current/aggregated-file'),
'inverted_publisher': JSONDir('./stats-calculated/current/inverted-publisher'),
'inverted_file': JSONDir('./stats-calculated/current/inverted-file'),
'inverted_file_publisher': JSONDir('./stats-calculated/current/inverted-file-publisher'),
'download_errors': []
}
ckan_publishers = JSONDir('./data/ckan_publishers')
github_issues = JSONDir('./data/github/publishers')
ckan = json.load(open('./stats-calculated/ckan.json'), object_pairs_hook=OrderedDict)
dataset_to_publisher_dict = {
dataset: publisher
for publisher, publisher_dict in ckan.items()
for dataset in publisher_dict.keys()
}
metadata = json.load(open('./stats-calculated/metadata.json'), object_pairs_hook=OrderedDict)
with open('./data/downloads/errors') as fp:
for line in fp:
if line != '.\n':
current_stats['download_errors'].append(line.strip('\n').split(' ', 3))
sources105 = [
'./data/schemas/1.05/iati-activities-schema.xsd',
'./data/schemas/1.05/iati-organisations-schema.xsd']
sources203 = [
'./data/schemas/2.03/iati-activities-schema.xsd',
'./data/schemas/2.03/iati-organisations-schema.xsd']
schema105 = XMLSchema(sources105)
schema203 = XMLSchema(sources203)
def is_valid_element(path):
try:
if schema203.get_element(None, path=path):
return True
except AttributeError:
pass
try:
if schema105.get_element(None, path=path):
return True
except AttributeError:
pass
return False
def transform_codelist_mapping_keys(codelist_mapping):
# Perform the same transformation as https://github.com/IATI/IATI-Stats/blob/d622f8e88af4d33b1161f906ec1b53c63f2f0936/stats.py#L12
codelist_mapping = {k: v for k, v in codelist_mapping.items() if not k.startswith('//iati-organisation')}
codelist_mapping = {re.sub(r'^\/\/iati-activity', './', k): v for k, v in codelist_mapping.items()}
codelist_mapping = {re.sub(r'^\/\/', './/', k): v for k, v in codelist_mapping.items()}
return codelist_mapping
def create_codelist_mapping(major_version):
codelist_mapping = {}
for x in json.load(open('data/IATI-Codelists-{}/out/clv2/mapping.json'.format(major_version))):
if 'condition' in x:
pref, attr = x['path'].rsplit('/', 1)
path = '{0}[{1}]/{2}'.format(
pref, x['condition'], attr)
else:
path = x['path']
codelist_mapping[path] = x['codelist']
return transform_codelist_mapping_keys(codelist_mapping)
MAJOR_VERSIONS = ['2', '1']
codelist_mapping = {v: create_codelist_mapping(v) for v in MAJOR_VERSIONS}
# Create a big dictionary of all codelist values by version and codelist name
codelist_sets = {
major_version: {
cname: set(c['code'] for c in codelist['data']) for cname, codelist in JSONDir('data/IATI-Codelists-{}/out/clv2/json/en/'.format(major_version)).items()
} for major_version in MAJOR_VERSIONS}
codelist_lookup = {
major_version: {
cname: {c['code']: c for c in codelist['data']} for cname, codelist in JSONDir('data/IATI-Codelists-{}/out/clv2/json/en/'.format(major_version)).items()
} for major_version in MAJOR_VERSIONS}
# Simple look up to map publisher id to a publishers given name (title)
publisher_name = {publisher: publisher_json['result']['title'] for publisher, publisher_json in ckan_publishers.items()}
# Create a list of tuples ordered by publisher given name titles - this allows us to display lists of publishers in alphabetical order
publishers_ordered_by_title = [(publisher_name[publisher], publisher) for publisher in current_stats['inverted_publisher']['activities'] if publisher in publisher_name]
publishers_ordered_by_title.sort(key=lambda x: (x[0]).lower())
# List of publishers who report all their activities as a secondary publisher
secondary_publishers = [publisher for publisher, stats in JSONDir('./stats-calculated/current/aggregated-publisher').items()
if int(stats['activities']) == len(stats['activities_secondary_reported']) and int(stats['activities']) > 0]
try:
dac2012 = {x[0]: Decimal(x[1].replace(',', '')) for x in csv.reader(open('data/dac2012.csv'))}
except IOError:
dac2012 = {}
def make_slugs(keys):
out = {'by_slug': {}, 'by_i': {}}
for i, key in enumerate(keys):
slug = re.sub(r'[^a-zA-Z0-9:@\-_]', '', re.sub(r'{[^}]*}', '', key.replace('{http://www.w3.org/XML/1998/namespace}', 'xml:').replace('/', '_'))).strip('_')
while slug in out['by_slug']:
slug += '_'
out['by_slug'][slug] = i
out['by_i'][i] = slug
return out
slugs = {
'codelist': {major_version: (
make_slugs(current_stats['inverted_publisher']['codelist_values_by_major_version'][major_version].keys())
if major_version in current_stats['inverted_publisher']['codelist_values_by_major_version']
else make_slugs([])
) for major_version in MAJOR_VERSIONS},
'element': make_slugs(current_stats['inverted_publisher']['elements'].keys()),
'org_type': make_slugs(['accountable_org', 'extending_org', 'funding_org', 'implementing_org', 'provider_org', 'receiver_org']),
}