-
Notifications
You must be signed in to change notification settings - Fork 3
/
rt_comparator.py
81 lines (65 loc) · 3.6 KB
/
rt_comparator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import datacompy
import logging
import concurrent.futures
def compare_routing_tables(rt_sut, rt_oracle, system, simple_ospf_nodes=None):
"""
Compare the routing tables computed by the system under test to the oracle routing tables using datacompy.
Routing tables should be in the dataframe format generated by Metha.
:param rt_sut: Routing table of the system under test
:param rt_oracle: Routing table of the oracle
:param system: System which is being tested
:param simple_ospf_nodes: Nodes which do not supply detailed ospf names, replaces ospfIA, ospfE1, and ospfE2 by ospf if present
:return: Datacompy Compare object describing the comparison result
"""
it = system.compare_items
rt_sut_c = rt_sut[rt_sut['Protocol'] != 'local'].filter(items=it)
rt_sut_c.loc[rt_sut_c['Protocol'] == 'ibgp', 'Protocol'] = 'bgp'
rt_sut_c.loc[:, 'Node'] = rt_sut_c['Node'].map(str.lower)
rt_sut_c.loc[:, 'Next_Hop'] = rt_sut_c['Next_Hop'].map(lambda x: str.lower(x) if x is not None else x)
if simple_ospf_nodes is not None:
for node in simple_ospf_nodes:
rt_sut_c.loc[((rt_sut_c['Protocol'] == 'ospfIA') & (rt_sut_c['Node'] == str.lower(node))), 'Protocol'] = 'ospf'
rt_sut_c.loc[((rt_sut_c['Protocol'] == 'ospfE1') & (rt_sut_c['Node'] == str.lower(node))), 'Protocol'] = 'ospf'
rt_sut_c.loc[((rt_sut_c['Protocol'] == 'ospfE2') & (rt_sut_c['Node'] == str.lower(node))), 'Protocol'] = 'ospf'
rt_sut_c = rt_sut_c.drop_duplicates()
rt_sut_c = rt_sut_c.astype('object')
rt_oracle_c = rt_oracle[rt_oracle['Protocol'] != 'local'].filter(items=it)
rt_oracle_c = system.transform_rt(rt_oracle_c)
rt_oracle_c.loc[:, 'Node'] = rt_oracle_c['Node'].map(str.lower)
rt_oracle_c.loc[:, 'Next_Hop'] = rt_oracle_c['Next_Hop'].map(lambda x: str.lower(x) if x is not None else x)
rt_oracle_c = rt_oracle_c.drop_duplicates()
compare = datacompy.Compare(
rt_sut_c,
rt_oracle_c,
join_columns=it,
df1_name='SUT',
df2_name='GNS3'
)
return compare
def run_comparison(path, gp, adj, system):
"""
Run provided configs in both the oracle as well as the system under test and compare them.
Write a report of the comparison as well as the computed routing tables to disk in the current test folder.
:param path: Current test folder, must contain configs
:param gp: GNS3 project
:param adj: Adjacency information
:param system: System under test
:return: Datacompy Compare object of the comparison result as well as optionally dataframes describing parser errors
and initialization issues of the test case
"""
logger = logging.getLogger('network-testing')
logger.info(f'Running test case in {path}')
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
oracle = executor.submit(gp.generate_routing_table, adj)
sut = executor.submit(system.run, path, adj)
(rt_sut, parse_err, init_issues) = sut.result(timeout=180)
rt_oracle = oracle.result(timeout=180)
simple_ospf_nodes = [name for name in gp.nodes if not gp.nodes[name].detailed_ospf]
compare = compare_routing_tables(rt_sut, rt_oracle, system, simple_ospf_nodes=simple_ospf_nodes)
with open(f'{path}report_{str.lower(str(system))}.txt', 'w') as f:
f.write(compare.report())
with open(f'{path}oracle_routing_table.csv', 'w') as f:
f.write(rt_oracle.to_csv(index=False))
with open(f'{path}{str.lower(str(system))}_routing_table.csv', 'w') as f:
f.write(rt_sut.to_csv(index=False))
return compare, parse_err, init_issues