-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.py
113 lines (98 loc) · 4.05 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
'''Data storage and retrieval interface'''
import csv
from enum import Enum
from io import TextIOWrapper
import json
import sys
from typing import List
SUCCESS_KEY = "succeed_domains"
PRIVACY_KEY = "private_domains"
FAILED_KEY = "failed_domains"
class Db:
'''Provides an interface to store/retrieve results'''
class Format(Enum):
'''Output formats'''
JSON = 1
CSV = 2
DB = {}
def record_country(self, domain, country, nameservers: List[str] = None):
'''Record a result'''
if SUCCESS_KEY not in self.DB:
self.DB[SUCCESS_KEY] = {}
if country not in self.DB[SUCCESS_KEY]:
self.DB[SUCCESS_KEY][country] = []
self.DB[SUCCESS_KEY][country].append(
{"domain": domain, "nameservers": nameservers})
def record_flagged(self, domain: str, nameservers: List[str] = None):
'''Record a privacy flagged domain'''
if PRIVACY_KEY not in self.DB:
self.DB[PRIVACY_KEY] = []
self.DB[PRIVACY_KEY].append(
{"domain": domain, "nameservers": nameservers})
def record_failed(self, domain, reason, nameservers: List[str] = None):
'''Record a failed domain lookup'''
if FAILED_KEY not in self.DB:
self.DB[FAILED_KEY] = {}
if reason not in self.DB[FAILED_KEY]:
self.DB[FAILED_KEY][reason] = []
self.DB[FAILED_KEY][reason].append(
{"domain": domain, "nameservers": nameservers})
def get_failed_domain_count(self):
'''Return the number of failed domains recorded'''
if FAILED_KEY not in self.DB:
return 0
count = 0
for key in self.DB[FAILED_KEY]:
count += len(self.DB[FAILED_KEY][key])
return count
def __str__(self):
return str(self.DB)
def output_results(self, output_loc: TextIOWrapper = None, fmt: Format = Format.JSON):
'''Outputs the results stored in the DB'''
if fmt == Db.Format.JSON:
self._output_results_json(output_loc)
elif fmt == Db.Format.CSV:
self._output_results_csv(output_loc)
def _output_results_json(self, output_loc: TextIOWrapper = None):
'''Outputs the results stored in the DB to a JSON file'''
results = json.dumps(self.DB, indent=4)
if output_loc is None:
print(results)
else:
output_loc.write(results)
def _output_results_csv(self, output_loc: TextIOWrapper = None):
'''Outputs the results stored in the DB to a CSV file'''
def nameservers_to_str(domain):
if domain["nameservers"] is not None:
return "|".join(domain["nameservers"])
return ""
fieldnames = ["domain", "private", "country", "nameservers"]
data = []
if SUCCESS_KEY in self.DB:
for country in self.DB[SUCCESS_KEY]:
for domain in self.DB[SUCCESS_KEY][country]:
data.append({
"domain": domain["domain"],
"private": False,
"country": "N/A" if country is None else country,
"nameservers": nameservers_to_str(domain)
})
if PRIVACY_KEY in self.DB:
for domain in self.DB[PRIVACY_KEY]:
data.append({"domain": domain["domain"],
"private": True,
"country": "Privacy Protected",
"nameservers": nameservers_to_str(domain)
})
if FAILED_KEY in self.DB:
for domain in self.DB[FAILED_KEY]:
data.append({"domain": domain["domain"],
"private": False,
"country": "Failed",
"nameservers": nameservers_to_str(domain)
})
writer = csv.DictWriter(output_loc, fieldnames=fieldnames)
if output_loc is None:
output_loc = sys.stdout
writer.writeheader()
writer.writerows(data)