forked from mitre/linkage-agent-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
match.py
executable file
·117 lines (104 loc) · 3.81 KB
/
match.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python
import argparse
import logging
from pathlib import Path
import time
from pymongo import MongoClient
from dcctools.anonlink import Project, Results
from dcctools.config import Configuration
# Delay between run status checks
SLEEP_TIME = 10.0
log = logging.getLogger(__name__)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--config",
default="config.json",
help='Configuration file (default "config.json")',
)
parser.add_argument(
"--verbose", default=False, action="store_true", help="Show debugging output"
)
args = parser.parse_args()
return args
def do_match(c):
client = MongoClient(c.mongo_uri)
database = client.linkage_agent
if c.household_match:
log.debug("Processing households")
with open(Path(c.household_schema)) as schema_file:
household_schema = schema_file.read()
project_name = "fn-phone-addr-zip"
household_project = Project(
project_name,
household_schema,
c.systems,
c.entity_service_url,
c.blocked,
)
household_project.start_project()
for system in c.systems:
household_project.upload_clks(
system, c.get_household_clks_raw(system, project_name)
)
household_project.start_run(c.matching_threshold)
running = True
while running:
status = household_project.get_run_status()
print(status)
if status.get("state") == "completed":
running = False
break
time.sleep(SLEEP_TIME)
result_json = household_project.get_results()
results = Results(c.systems, project_name, result_json)
results.insert_results(database.household_match_groups)
else:
log.debug("Processing individuals")
if c.blocked:
log.debug("Blocked, extracting CLKs and blocks")
for system in c.systems:
c.extract_clks(system)
c.extract_blocks(system)
iter_num = 0
for project_name, schema in c.load_schema().items():
iter_num = iter_num + 1
project = Project(
project_name, schema, c.systems, c.entity_service_url, c.blocked
)
project.start_project()
for system in c.systems:
if c.blocked:
project.upload_clks_blocked(
system,
c.get_clk(system, project_name),
c.get_block(system, project_name),
)
else:
project.upload_clks(system, c.get_clks_raw(system, project_name))
project.start_run(c.matching_threshold)
running = True
print("\n--- RUNNING ---\n")
while running:
status = project.get_run_status()
print(status)
if status.get("state") == "completed":
running = False
break
time.sleep(SLEEP_TIME)
print("\n--- Getting results ---\n")
result_json = project.get_results()
results = Results(c.systems, project_name, result_json)
print(
"Matching groups for system "
+ str(iter_num)
+ " of "
+ str(len(c.load_schema().items()))
)
results.insert_results(database.match_groups)
if __name__ == "__main__":
args = parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG, format="%(message)s")
config = Configuration(args.config)
do_match(config)