-
Notifications
You must be signed in to change notification settings - Fork 5
/
0.icp_query.py
138 lines (119 loc) · 6.41 KB
/
0.icp_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
'''
Author : S1g0day
Creat time : 2024/3/15 17:27
Modification time: 2024/8/10 14:48
Introduce : 通过接口查询域名或公司备案
'''
import yaml
from datetime import datetime
from argparse import ArgumentParser
from lib.logo import logo
from lib.Requests_func import make_request
from lib.log_functions import write_log_success, write_log_error, write_log_warning
def Page_traversal_temporary(id, total, params, query_url, req_list):
# 一页显示所有数据
domainId_list = []
params['pageSize'] = total
req_page_unitName = make_request(query_url, params, req_list[0]['unitName'])
if req_page_unitName:
unitName_list = req_page_unitName['params']['list']
for item in unitName_list:
if item.get('domain') and item.get('unitName'):
success_output = f"id:{id}\tdomainId:{item['domainId']}\tunitName:{item['unitName']}\tnatureName:{item['natureName']}\tdomain:{item['domain']}\tmainLicence:{item['mainLicence']}\tserviceLicence:{item['serviceLicence']}\tupdateRecordTime:{item['updateRecordTime']}"
if item['domainId'] and item['domainId'] not in domainId_list:
domainId_list.append(item['domainId'])
success_log_file_path = 'log/success.log'
write_log_success(success_log_file_path, success_output)
else:
write_log_warning("unitName or domain is None...")
else:
write_log_warning(f"No unitName_list found for {req_list}. Skipping...")
return domainId_list
def query_from(query_url, search_data, id):
params = {
'search': search_data,
'pageNum': 1,
'pageSize': 10,
}
req = make_request(query_url, params, search_data)
# 检查req是否为字典类型或是否包含所需的键
if req and isinstance(req, dict) and 'params' in req:
try:
req_list = req['params']['list']
if req_list and isinstance(req_list, list) and len(req_list) > 0:
params['search'] = req_list[0]['unitName']
req_unitName = make_request(query_url, params, params['search'])
if req_unitName and isinstance(req_unitName, dict) and 'params' in req_unitName:
total = req_unitName['params']['total']
domain_list = Page_traversal_temporary(id, total, params, query_url, req_list)
if domain_list and isinstance(domain_list, list) and total != len(domain_list):
error_icp_output = f"{search_data} 应提取出 {total} 条信息,实际为 {len(domain_list)} 条"
error_icp_log_file_path = 'log/error_icp.log'
write_log_error(error_icp_log_file_path, error_icp_output)
return total
except Exception as e:
error_occurred_output = f"{search_data} an error occurred: {str(e)}"
error_occurred_log_file_path = 'log/error_occurred.log'
write_log_error(error_occurred_log_file_path, error_occurred_output, search_data)
no_req_list_output = f"Does not have req_list: {search_data}"
no_req_list_log_file_path = 'log/no_req_list.log'
write_log_error(no_req_list_log_file_path, no_req_list_output, search_data)
# 根据您的需求,如果if条件不满足,最多重新运行10次
return None
def query_from_file(query_url, filename, start_index):
with open(filename, 'r', encoding='utf-8') as file:
data_list = file.readlines()
total_domains = len(data_list)
if start_index < 1:
start_index = 1
write_log_warning("输入异常, start_index 重置为 1")
elif start_index > total_domains:
start_index = total_domains
write_log_warning(f"输入异常, start_index 重置为 {total_domains}")
for index in range(start_index-1, total_domains):
data = data_list[index].strip()
if data:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')
Processing_Domain_output = f'Time: {current_time}, Schedule: {index+1}/{total_domains}, Domain: {data}'
print("\n")
write_log_warning(f"Processing {Processing_Domain_output}")
print("\n")
total = query_from(query_url, data, index+1)
if total is not None:
Processing_Domain_output += f', Total: {total}'
Processing_Domain_log_file_path = 'log/processing_Domain.log'
write_log_success(Processing_Domain_log_file_path, Processing_Domain_output)
if __name__ == '__main__':
logo()
parser = ArgumentParser()
parser.add_argument("-d", dest="query_url", help="请输入测试平台地址")
parser.add_argument("-u", dest="domain", help="请输入目标")
parser.add_argument("-uf", dest="domains_file", help="请输入目标文件")
parser.add_argument("-s", dest="start_index", type=int, default="1", help="请输入起始位置,第一个数据的下标为0")
args = parser.parse_args()
# Load YAML configuration
push_config = yaml.safe_load(open("config/config.yaml", "r", encoding="utf-8").read())
if args.query_url:
if args.domain:
# 执行查询
query_from(args.query_url, args.domain, args.start_index)
elif args.domains_file:
write_log_warning(f"Query_urls: {args.query_url}")
write_log_warning(f"Domains_file: {args.domains_file}")
query_from_file(args.query_url, args.domains_file, args.start_index)
else:
write_log_warning(f"Query_urls: {args.query_url}")
write_log_warning(f"Domains_file: {push_config['domains_file']}")
query_from_file(args.query_url, push_config['domains_file'], args.start_index)
else:
if args.domain:
# 执行查询
query_from(push_config['query_url'], args.domain, args.start_index)
elif args.domains_file:
write_log_warning(f"Query_urls: {push_config['query_url']}")
write_log_warning(f"Domains_file: {args.domains_file}")
query_from_file(push_config['query_url'], args.domains_file, args.start_index)
else:
write_log_warning(f"Query_urls: {push_config['query_url']}")
write_log_warning(f"Domains_file: {push_config['domains_file']}")
query_from_file(push_config['query_url'], push_config['domains_file'], args.start_index)