-
Notifications
You must be signed in to change notification settings - Fork 0
/
runscan.py
97 lines (73 loc) · 3.17 KB
/
runscan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from requests import session
from json import loads, dumps
from localrepos import repo
from logs import log, dbg
from time import sleep
from prettytable import PrettyTable
class client(object):
def __init__(self, authkey, host):
self.host = host
self.authkey = authkey
self.prepareSession()
def prepareSession(self):
self.session = session()
self.session.headers["Repository"] = self.authkey
dbg("client.prepareSession", "Repository: {}".format(self.session.headers["Repository"]))
def getUploadInfo(self):
if (r := self.session.get(self.host + "/api/repositories/{}/upload".format(self.repo.uuid))).status_code == 200:
data = loads(r.text)
log("client.getUploadInfo", "type: {}, url: {}".format(data["type"], data["url"]))
return data
else:
log("client.getUploadInfo", "Something went wrong")
dbg("client.getUploadInfo", "CODE: {}".format(r.status_code))
dbg("client.getUploadInfo", r.text)
raise Exception("Check parameters")
return None
def doUpload(self, uploadInfo):
r = self.session.put(uploadInfo["url"], data=self.repo.getReadHandle())
data = loads(r.text)
log("client.doUpload", data)
return data["task_id"]
def _checkStatus(self, taskid):
r = self.session.get(self.host + "/api/scans/{}".format(taskid))
dbg("client._checkStatus", r.text)
data = loads(r.text)
return data["status"]
def waitUntilScanIsDone(self, taskid):
while (status := self._checkStatus(taskid)) not in ["SUCCESS", "FAILURE", "REVOKED"]:
log("client.waitUntilScanIsDone", "waiting..")
sleep(1)
return status
def getScanResult(self, taskid):
r = self.session.get(self.host + "/api/scans/{}/result".format(taskid))
dbg("client.getScanResult", r.text)
return r.text
def runscan(self, path, uuid):
self.repo = repo(path, uuid)
self.repo.createTar()
status = "FAILURE"
try:
uploadInfo = self.getUploadInfo()
taskid = self.doUpload(uploadInfo)
status = self.waitUntilScanIsDone(taskid)
except Exception as e:
dbg("client.runscan", "Got exception {} during scan".format(e))
finally:
self.repo.deleteTar()
if status != "SUCCESS":
log("client.runscan", "Could not run scan, check your parameters or API logs")
return False, None
else:
data = loads(self.getScanResult(taskid))
vulnerabilities = data["vulnerabilities"]
count = 0
for vuln in vulnerabilities:
print(vuln['tool'], vuln['details'])
log("client.runscan", "Result for scan ({} vulnerabilities were found):".format(count))
tbl = PrettyTable()
tbl.field_names = ["Level", "Count", "New Count"]
for level in ['ERROR', 'WARNING', 'NOTE', 'NONE']:
tbl.add_row([level, data[level.lower()+"_count"], data["new_"+level.lower()+"_count"]])
print(tbl)
return True, data