-
Notifications
You must be signed in to change notification settings - Fork 0
/
Scrubber.py
95 lines (74 loc) · 3.72 KB
/
Scrubber.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""_summary_ = "This script is used to scrub the JSON file that is generated by Cellebrite PA "
Raises:
ValueError: _description_ # If the JSON file is not a VICS data model
Returns:
_type_: _description_ # The scrubbed JSON file is saved in the same directory as the source file with the suffix "_scrubbed"
"""
import re
import os
import ujson as json
from tqdm import tqdm
class Scrubber:
_IDENTIFIER = r"\bVICSDATAMODEL\b" # VICS data model identifier
_PDNAVALUE = "AAMANwABAD0AAQA9HgEBQ5oAFlFMAA0dAwYXcQMHDYcZBg2GUgotfcYAabpdABVPIyn/Gg1S/ylWTehHakHuaW8M5bjTBkV5JhNqAAZMnQBATLwDOX1fLEAgRG1SEzheHAQgAAMtNQgFYTU0DYpHKG0feShzBnwMBgMNAwEnFwkGM1gJGSqhATQUsANJDUwD" # PhotoDNA hash value to be removed
def __init__(self, jsonSourcePath):
self.jsonSourcePath = jsonSourcePath
self.path = os.path.dirname(jsonSourcePath)
self.fileName = os.path.basename(jsonSourcePath)
self.dataModel = ""
def _openJson(self):
'''_summary_ = "This function is used to open the JSON file and check if it is a VICS data model"'''
try:
f = open(self.jsonSourcePath, "r",encoding="utf-8")
except OSError as e:
print(e.strerror)
sys.exit(1)
data = json.load(f)
f.close()
self.dataModel = data['@odata.context']
try:
if re.search(self._IDENTIFIER, self.dataModel): # Check if the JSON file is a VICS data model
return data
else:
raise ValueError("Invalid JSON file")
except ValueError as err:
print(str(err))
sys.exit(1)
def scrub(self):
"""_summary_ = "This function is used to scrub the JSON file that is generated by Cellebrite PA """
data = self._openJson()
fileCount= len(data['value'][0]['Media'])
self._fileInfo(data, fileCount)
photoDNAFileCount = self._scanForPhotoDNA(data, fileCount)
print("PhotoDNA File Found: " + str(photoDNAFileCount))
if photoDNAFileCount > 0:
print("\nRemoving PhotoDNA hash from VICS JSON File...")
for i in tqdm (range(fileCount),"Scrubbing PhotoDNA hash..."):
del data['value'][0]['Media'][i]['AlternativeHashes'][0]['HashName']
del data['value'][0]['Media'][i]['AlternativeHashes'][0]['HashValue']
dataOut = json.dumps(data, indent=2, escape_forward_slashes=False)
newFileName = self.fileName.replace(".json", "_scrubbed.json")
try:
f = open(os.path.join(self.path,newFileName), "w", encoding="utf-8")
f.write(dataOut)
f.close()
except OSError as e:
print(e.strerror)
sys.exit(1)
def _fileInfo(self, data,fileCount):
'''_summary_ = "This function is used to display the file details of the JSON file"'''
app = data['value'][0]['SourceApplicationName']
paVersion = data['value'][0]['SourceApplicationVersion']
print("\nPA VICS JSON Scrubber")
print("JSON File Details")
print("-----------------")
print("File Name: " + self.fileName)
print("File Count: " + str(fileCount))
print("PA Version: " + app + " " + paVersion)
def _scanForPhotoDNA(self, data, fileCount):
'''_summary_ = "This function is used to scan the JSON file for PhotoDNA hash"'''
pDNAFileCount = 0
for i in tqdm (range(fileCount),"Scanning for PhotoDNA hash..."):
if data['value'][0]['Media'][i]['AlternativeHashes'][0]['HashValue'] == self._PDNAVALUE:
pDNAFileCount += 1
return pDNAFileCount