diff --git a/rapyuta_io/clients/paramserver.py b/rapyuta_io/clients/paramserver.py index 7b7bf6be..3897acf4 100644 --- a/rapyuta_io/clients/paramserver.py +++ b/rapyuta_io/clients/paramserver.py @@ -3,6 +3,7 @@ import enum import errno import hashlib +import json import mimetypes import os import tempfile @@ -151,10 +152,16 @@ def process_dir(self, executor, rootdir, tree_path, level, dir_futures, file_fut future = executor.submit(self.create_binary_file, new_tree_path, full_path) if file_name.endswith('.yaml'): data = parse_yaml(full_path) - future = executor.submit(self.create_file, new_tree_path, data) + if self.should_upload_as_binary(data, self.yaml_content_type): + future = executor.submit(self.create_binary_file, new_tree_path, full_path) + else: + future = executor.submit(self.create_file, new_tree_path, data) elif file_name.endswith('.json'): data = parse_json(full_path) - future = executor.submit(self.create_file, new_tree_path, data, content_type=self.json_content_type) + if self.should_upload_as_binary(data, self.json_content_type): + future = executor.submit(self.create_binary_file, new_tree_path, full_path) + else: + future = executor.submit(self.create_file, new_tree_path, data, content_type=self.json_content_type) else: future = executor.submit(self.create_binary_file, new_tree_path, full_path) file_futures[future] = new_tree_path @@ -167,22 +174,45 @@ def process_folder(self, executor, rootdir, tree_path, level, dir_futures, file_ if isdir(full_path): future = executor.submit(self.create_folder, new_tree_path) dir_futures[future] = (new_tree_path, level + 1) - else: - file_stat = os.stat(full_path) - file_name = os.path.basename(full_path) - if file_stat.st_size > self.max_non_binary_size: + continue + file_stat = os.stat(full_path) + file_name = os.path.basename(full_path) + if file_stat.st_size > self.max_non_binary_size: + future = executor.submit(self.create_binary_file, new_tree_path, full_path) + elif file_name.endswith('.yaml'): + data = parse_yaml(full_path) + if self.should_upload_as_binary(data, self.yaml_content_type): future = executor.submit(self.create_binary_file, new_tree_path, full_path) - elif file_name.endswith('.yaml'): - data = parse_yaml(full_path) - future = executor.submit(self.create_file, new_tree_path, data) - elif file_name.endswith('.json'): - data = parse_json(full_path) - future = executor.submit(self.create_file, new_tree_path, data, content_type=self.json_content_type) else: + future = executor.submit(self.create_file, new_tree_path, data) + elif file_name.endswith('.json'): + data = parse_json(full_path) + if self.should_upload_as_binary(data, self.json_content_type): future = executor.submit(self.create_binary_file, new_tree_path, full_path) - file_futures[future] = new_tree_path + else: + future = executor.submit(self.create_file, new_tree_path, data, content_type=self.json_content_type) + else: + future = executor.submit(self.create_binary_file, new_tree_path, full_path) + file_futures[future] = new_tree_path + return dir_futures, file_futures + def should_upload_as_binary(self, filedata, content_type): + """Determines if the file should be uploaded as binary based on the file size + + While the file data may be less than the supported limit, the combined size of + the API payload is what is stored by paramserver. This method calculates the + size of the payload and determines if it exceeds the supported limit. If it does, + the file is uploaded as a binary to the blob store. + + We cannot entirely rely on the file stat result since the file data is sent as + a string in the API payload and the final size inflates when json.dumps is + performed on the final payload. + """ + metadata_size_buffer = 200 # In bytes + payload = {'type': _Node.File, 'data': filedata, 'contentType': content_type} + return len(json.dumps(payload)) + metadata_size_buffer > self.max_non_binary_size + def upload_configurations(self, rootdir, tree_names, delete_existing_trees, as_folder=False): self.validate_args(rootdir, tree_names, delete_existing_trees, as_folder) with futures.ThreadPoolExecutor(max_workers=15) as executor: