Skip to content

Commit

Permalink
Merge pull request #5 from sculley/feature/adding-multi-threaded-workers
Browse files Browse the repository at this point in the history
adding multi-threaded support
  • Loading branch information
bunnykek authored Oct 20, 2023
2 parents 4f2c658 + e60df16 commit 314d098
Showing 1 changed file with 34 additions and 19 deletions.
53 changes: 34 additions & 19 deletions WeTransferTool/wetransfertool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import requests
import hashlib
import os
import concurrent.futures

class We:
def __init__(self):
Expand All @@ -22,7 +23,7 @@ def __init__(self):
self.__session = requests.Session()
self.__session.headers.update({'X-Requested-With': 'XMLHttpRequest'})

def upload(self, path: str, display_name: str = '', message: str = ''):
def upload(self, path: str, display_name: str = '', message: str = '', max_workers: int = 10):
"""Returns a json containing the metadata and the link to the uploaded file/folder"""

print("Uploading", os.path.basename(path))
Expand All @@ -34,7 +35,7 @@ def upload(self, path: str, display_name: str = '', message: str = ''):
files = files_response['files']
auth_bearer = files_response['storm_upload_token']
self.endpoints = self.__decodejwt(auth_bearer)
return self.__process_files(files, transfer_id, path, type, auth_bearer)
return self.__process_files(files, transfer_id, path, type, auth_bearer, max_workers)

def download(self, download_url: str, download_path: str = ''):
"""Downloads from a url
Expand Down Expand Up @@ -135,7 +136,7 @@ def __link_files(self, files: list, display_name: str, message: str):
else:
raise Exception("liink files error\n", response.text)

def __process_files(self, files: dict, transfer_id: str, path: str, type: str, auth_bearer: str):
def __process_files(self, files: dict, transfer_id: str, path: str, type: str, auth_bearer: str, max_workers: int):
items = []
contentlenforblocks = []
content_md5 = []
Expand Down Expand Up @@ -195,9 +196,9 @@ def __process_files(self, files: dict, transfer_id: str, path: str, type: str, a

# print(s3_urls)

self.__upload_chunks(files_path, s3_urls)
self.__upload_chunks(files_path, s3_urls, max_workers=max_workers)

time.sleep(2)
time.sleep(4)

self.__batch(file_name_bcount, s3_urls, auth_bearer)

Expand Down Expand Up @@ -264,25 +265,39 @@ def __blocks(self, blocks: list, auth_bearer: str):
rblock['put_request_headers']['Content-MD5'], rblock['block_id']])
return s3_urls

def __upload_chunks(self, files_path: list, s3_urls: list):
def __upload_chunk(self, s3_url, chunk, chunk_number, total_chunks):
headers = {
'Content-MD5': '',
'Content-MD5': s3_url[1],
}

i = 0
for file_path in files_path:
with open(file_path, 'rb') as file:
while chunk := file.read(15728640):
headers['Content-MD5'] = s3_urls[i][1]
response = self.__session.put(
s3_urls[i][0], data=chunk, headers=headers)
i += 1
if response.status_code != 200:
raise Exception(
'Error on upload_chunks\n', response.text)
print(f'Uploaded {os.path.basename(file_path)}')
response = self.__session.put(s3_url[0], data=chunk, headers=headers)

if response.status_code != 200:
print('Error on upload_chunk', response.text)
return False

return True

def __upload_chunks(self, files_path: list, s3_urls: list, max_workers: int):
total_chunks = len(s3_urls)

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
chunk_number = 1

for file_path in files_path:
with open(file_path, 'rb') as file:
while (chunk := file.read(15728640)):
futures.append(executor.submit(
self.__upload_chunk, s3_urls[chunk_number - 1], chunk, chunk_number, total_chunks))
chunk_number += 1

for future in concurrent.futures.as_completed(futures):
if not future.result():
raise Exception('Error in multi-threaded chunk upload')

print(f'Uploaded {", ".join([os.path.basename(path) for path in files_path])}')

def __finalize_chunks_upload(self, transfer_id: str):

json_data = {
Expand Down

0 comments on commit 314d098

Please sign in to comment.