Skip to content

Commit

Permalink
preprocess speed improvements - find assets list on first access, add…
Browse files Browse the repository at this point in the history
… timer, invalidate cache in the end too, bulk check already processed jsons, add percentage progress
  • Loading branch information
slatinsky committed Aug 27, 2023
1 parent 34d0bbb commit 63f53ce
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 74 deletions.
7 changes: 6 additions & 1 deletion backend/preprocess/AssetProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class AssetProcessor:
def __init__(self, file_finder: FileFinder, database: MongoDatabase):
self.file_finder = file_finder
self.collection_assets = database.get_collection("assets")
self.local_assets = file_finder.find_local_assets()
self.local_assets = None
self.fast_mode = False
self.cache = {}

Expand Down Expand Up @@ -150,6 +150,11 @@ def get_local_path(self, filename_with_hash: str) -> str:
"""
returns local path of asset
"""
# load local assets only if needed
# this cuts down loading time if no new exports were added
if self.local_assets is None:
self.local_assets = self.file_finder.find_local_assets()

if filename_with_hash in self.local_assets:
return self.local_assets[filename_with_hash]
else:
Expand Down
2 changes: 2 additions & 0 deletions backend/preprocess/ChannelCache.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def invalidate_all(self):
"""
We need to invalidate cache to update
"""
print("invalidating cache")
for filename in os.listdir(self.cache_folder_path):
if filename.endswith(".json"):
print(" invalidating cache: " + filename)
os.remove(self.cache_folder_path + "/" + filename)

25 changes: 6 additions & 19 deletions backend/preprocess/FileFinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,20 @@ def find_channel_exports(self):
if re.search(r"-([a-fA-F0-9]{5})\.json$", filename) != None:
continue

# ignore channel_info.json
# ignore channel_info.json and guild_info.json
if filename.endswith('channel_info.json'):
continue
if filename.endswith('guild_info.json'):
continue

try:
# quick check if file is a export made by DiscordChatExporter
with open(filename, encoding='utf-8') as file:
first_16_bytes = file.read(16)
if first_16_bytes.find("guild") == -1:
print("invalid file " + filename)
continue
filename_without_base_directory = self.remove_base_directory(filename)
files.append(filename_without_base_directory)

except PermissionError:
print("permission error while reading file " + filename)
print(traceback.format_exc())
filename_without_base_directory = self.remove_base_directory(filename)
files.append(filename_without_base_directory)

except Exception as e:
# we don't want to crash the program if a file is unreadable
# just print the error and continue
print("error while reading file " + filename)
print(traceback.format_exc())

return files

def find_local_assets(self):
print("finding local assets in " + self.base_directory)
input_directory = self.base_directory
all_files = {}
# file can be extensionless and without a dash
Expand Down
63 changes: 14 additions & 49 deletions backend/preprocess/JsonProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import hashlib
from itertools import zip_longest
import os
import ijson
from pprint import pprint

from pymongo import ReplaceOne, UpdateOne
Expand All @@ -19,7 +20,7 @@


class JsonProcessor:
def __init__(self, database: MongoDatabase, file_finder: FileFinder, json_path:str, asset_processor: AssetProcessor):
def __init__(self, database: MongoDatabase, file_finder: FileFinder, json_path:str, asset_processor: AssetProcessor, index: int, total: int):
self.json_path = json_path
self.database = database
self.collection_guilds = self.database.get_collection("guilds")
Expand All @@ -32,6 +33,8 @@ def __init__(self, database: MongoDatabase, file_finder: FileFinder, json_path:s
self.collection_jsons = self.database.get_collection("jsons")
self.file_finder = file_finder
self.asset_processor = asset_processor
self.index = index
self.total = total

def process_guild(self, guild):
guild["_id"] = pad_id(guild.pop("id"))
Expand Down Expand Up @@ -369,48 +372,6 @@ def insert_role(self, role):
self.collection_roles.insert_one(role)
return

def check_if_processed(self, json_path):
"""
Checks if a file has already been processed
Returns True if the file has already been processed
Returns False if the file has not been processed yet
"""

json_path_with_base_dir = self.file_finder.add_base_directory(json_path)

# read from database
json = self.collection_jsons.find_one({"_id": json_path})

if json == None:
# file not found in database, it is new file
return False

# do quick checks first (date modified, file size), because hashing is slow

date_modified = os.path.getmtime(json_path_with_base_dir)
if json["date_modified"] == date_modified:
# if time modified is the same, file was not modified
return True

file_size = os.path.getsize(json_path_with_base_dir)
if json["size"] == file_size:
# file size is the same, file was not modified
return True

# slow check - file hash
file_hash = hashlib.sha256()
with open(json_path_with_base_dir, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
file_hash.update(byte_block)
hex_hash = file_hash.hexdigest()

if json["sha256_hash"] == hex_hash:
# file hash is the same, file was not modified
return True

# all checks failed, process the file again
self.collection_jsons.delete_one({"_id": json_path})

def mark_as_processed(self, json_path):
"""
Marks a file as processed by adding it to the jsons collection
Expand Down Expand Up @@ -469,17 +430,21 @@ def merge_messages(self, messages_list1: list, messages_list2: list) -> list:
return merged_messages

def process(self):
if self.check_if_processed(self.json_path):
print("already processed " + self.json_path)
return

print("processing " + self.json_path)
print(f"{self.index + 1}/{self.total} ({round((self.index + 1) / self.total * 100, 2)}%) processing {self.json_path}")

file_path_with_base_directory = self.file_finder.add_base_directory(self.json_path)
with JsonFileStreamer(file_path_with_base_directory) as jfs:
try:
guild = jfs.get_guild()
except ijson.common.IncompleteJSONError:
print(f' ERROR: IncompleteJSONError - file "{file_path_with_base_directory}" is corrupted')
return
if guild == None:
print("invalid file " + self.json_path)
return

file_size_human = jfs.get_file_size_human()
file_size = jfs.get_file_size()
guild = jfs.get_guild()
channel = jfs.get_channel()
print(f"guild: '{guild['name']}', channel '{channel['name']}, file size: {file_size_human}")

Expand Down
15 changes: 15 additions & 0 deletions backend/preprocess/Timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import time


class Timer(object):
def __init__(self, name=None):
self.name = name

def __enter__(self):
self.tstart = time.time()

def change_name(self, name):
self.name = name

def __exit__(self, type, value, traceback):
print(f'[{self.name}] took {round(time.time() - self.tstart, 2)} seconds')
62 changes: 57 additions & 5 deletions backend/preprocess/main_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from AssetProcessor import AssetProcessor
from JsonProcessor import JsonProcessor
from Downloader import download_gg
from Timer import Timer

# fix PIPE encoding error on Windows, auto flush print
sys.stdout.reconfigure(encoding='utf-8')
Expand Down Expand Up @@ -43,6 +44,46 @@ def wipe_database(database):
version["value"] = EXPECTED_VERSION
config.update_one({"key": "version"}, {"$set": version})

def remove_processed_jsons(database, jsons):
"""
removes jsons from the list that are already processed
"""
database_jsons = database.get_collection("jsons").find()
for database_json in database_jsons:
json_path = database_json["_id"]
if json_path in jsons:
jsons.remove(json_path)

# TODO: add these checks back (date modified, file size, file hash)
# if json == None:
# # file not found in database, it is new file
# return False

# # do quick checks first (date modified, file size), because hashing is slow

# date_modified = os.path.getmtime(json_path_with_base_dir)
# if json["date_modified"] == date_modified:
# # if time modified is the same, file was not modified
# return True

# file_size = os.path.getsize(json_path_with_base_dir)
# if json["size"] == file_size:
# # file size is the same, file was not modified
# return True

# # slow check - file hash
# file_hash = hashlib.sha256()
# with open(json_path_with_base_dir, "rb") as f:
# for byte_block in iter(lambda: f.read(4096), b""):
# file_hash.update(byte_block)
# hex_hash = file_hash.hexdigest()

# if json["sha256_hash"] == hex_hash:
# # file hash is the same, file was not modified
# return True

return jsons


def main(input_dir, output_dir):
print("main_mongo loaded")
Expand All @@ -54,28 +95,39 @@ def main(input_dir, output_dir):
# database.clear_database_except_assets()
# database.clear_assets()


file_finder = FileFinder(input_dir)

jsons = file_finder.find_channel_exports()
print("found " + str(len(jsons)) + " json channel exports")
jsons_count_before = len(jsons)
jsons = remove_processed_jsons(database, jsons)
jsons_count = len(jsons)
print(" found " + str(jsons_count) + " new possible json channel exports")
print(" found " + str(jsons_count_before - jsons_count) + " already processed json channel exports")

channel_cache = ChannelCache()
channel_cache.invalidate_all()
asset_processor = AssetProcessor(file_finder, database)
asset_processor.set_fast_mode(True) # don't process slow actions

for json_path in jsons:
p = JsonProcessor(database, file_finder, json_path, asset_processor)

for index, json_path in enumerate(jsons):
p = JsonProcessor(database, file_finder, json_path, asset_processor, index, jsons_count)
p.process()

download_gg(output_dir)

print("done")
# if user browses exports before they are processed, cached channels may be invalid again
# so we need to invalidate cache again
channel_cache = ChannelCache()

print("preprocess done")



if __name__ == "__main__":
input_dir = sys.argv[1]
output_dir = sys.argv[2]
main(input_dir, output_dir)
with Timer("Preprocess"):
main(input_dir, output_dir)

0 comments on commit 63f53ce

Please sign in to comment.