Skip to content

Commit

Permalink
Merge pull request #12 from gdcc/other-io-types
Browse files Browse the repository at this point in the history
Support multiple `IO` types
  • Loading branch information
JR-1991 authored Apr 28, 2024
2 parents 15ea821 + c4b40bc commit eda8180
Show file tree
Hide file tree
Showing 13 changed files with 290 additions and 174 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import dvuploader as dv
# Add file individually
files = [
dv.File(filepath="./small.txt"),
dv.File(directoryLabel="some/dir", filepath="./medium.txt"),
dv.File(directoryLabel="some/dir", filepath="./big.txt"),
dv.File(directory_label="some/dir", filepath="./medium.txt"),
dv.File(directory_label="some/dir", filepath="./big.txt"),
*dv.add_directory("./data"), # Add an entire directory
]

Expand Down Expand Up @@ -88,7 +88,7 @@ Alternatively, you can also supply a `config` file that contains all necessary i
* `api_token`: API token of the Dataverse instance.
* `files`: List of files to upload. Each file is a dictionary with the following keys:
* `filepath`: Path to the file to upload.
* `directoryLabel`: Optional directory label to upload the file to.
* `directory_label`: Optional directory label to upload the file to.
* `description`: Optional description of the file.
* `mimetype`: Mimetype of the file.
* `categories`: Optional list of categories to assign to the file.
Expand All @@ -104,9 +104,9 @@ api_token: XXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
files:
- filepath: ./small.txt
- filepath: ./medium.txt
directoryLabel: some/dir
directory_label: some/dir
- filepath: ./big.txt
directoryLabel: some/dir
directory_label: some/dir
```
The `config` file can then be used as follows:
Expand Down
37 changes: 25 additions & 12 deletions dvuploader/checksum.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import hashlib
from enum import Enum
import os
from typing import Callable
from typing import IO, Callable

from pydantic import BaseModel, ConfigDict, Field

Expand Down Expand Up @@ -34,34 +34,41 @@ class Checksum(BaseModel):
value (str): The value of the checksum.
"""

model_config = ConfigDict(populate_by_name=True)
model_config = ConfigDict(
arbitrary_types_allowed=True,
populate_by_name=True,
)

type: str = Field(..., alias="@type")
value: str = Field(..., alias="@value")

@classmethod
def from_file(
cls,
fpath: str,
handler: IO,
hash_fun: Callable,
hash_algo: str,
) -> "Checksum":
"""Takes a file path and returns a checksum object.
Args:
fpath (str): The file path to generate the checksum for.
handler (IO): The file handler to generate the checksum for.
hash_fun (Callable): The hash function to use for generating the checksum.
hash_algo (str): The hash algorithm to use for generating the checksum.
Returns:
Checksum: A Checksum object with type and value fields.
"""

value = cls._chunk_checksum(fpath=fpath, hash_fun=hash_fun)
value = cls._chunk_checksum(handler=handler, hash_fun=hash_fun)
return cls(type=hash_algo, value=value) # type: ignore

@staticmethod
def _chunk_checksum(fpath: str, hash_fun: Callable, blocksize=2**20) -> str:
def _chunk_checksum(
handler: IO,
hash_fun: Callable,
blocksize=2**20
) -> str:
"""Chunks a file and returns a checksum.
Args:
Expand All @@ -73,10 +80,16 @@ def _chunk_checksum(fpath: str, hash_fun: Callable, blocksize=2**20) -> str:
str: A string representing the checksum of the file.
"""
m = hash_fun()
with open(fpath, "rb") as f:
while True:
buf = f.read(blocksize)
if not buf:
break
m.update(buf)
while True:
buf = handler.read(blocksize)

if not isinstance(buf, bytes):
buf = buf.encode()

if not buf:
break
m.update(buf)

handler.seek(0)

return m.hexdigest()
58 changes: 34 additions & 24 deletions dvuploader/directupload.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def direct_upload(
if status is True:
continue

print(f"❌ Failed to upload file '{file.fileName}' to the S3 storage")
print(f"❌ Failed to upload file '{file.file_name}' to the S3 storage")

headers = {
"X-Dataverse-key": api_token,
Expand Down Expand Up @@ -129,23 +129,19 @@ async def _upload_to_store(

await asyncio.sleep(delay)

assert file.fileName is not None, "File name is None"
assert os.path.exists(file.filepath), f"File {file.filepath} does not exist"

file_size = os.path.getsize(file.filepath)
ticket = await _request_ticket(
session=session,
dataverse_url=dataverse_url,
api_token=api_token,
file_size=file_size,
file_size=file._size,
persistent_id=persistent_id,
)

if not "urls" in ticket:
status, storage_identifier = await _upload_singlepart(
session=session,
ticket=ticket,
filepath=file.filepath,
file=file,
pbar=pbar,
progress=progress,
api_token=api_token,
Expand All @@ -156,7 +152,7 @@ async def _upload_to_store(
status, storage_identifier = await _upload_multipart(
session=session,
response=ticket,
filepath=file.filepath,
file=file,
dataverse_url=dataverse_url,
pbar=pbar,
progress=progress,
Expand Down Expand Up @@ -207,7 +203,7 @@ async def _request_ticket(
async def _upload_singlepart(
session: aiohttp.ClientSession,
ticket: Dict,
filepath: str,
file: File,
pbar,
progress,
api_token: str,
Expand Down Expand Up @@ -241,21 +237,16 @@ async def _upload_singlepart(
params = {
"headers": headers,
"url": ticket["url"],
"data": open(filepath, "rb"),
"data": file.handler,
}

async with session.put(**params) as response:
status = response.status == 200
response.raise_for_status()

if status:
progress.update(
pbar,
advance=os.path.getsize(filepath),
)

progress.update(pbar, advance=file._size)
await asyncio.sleep(0.1)

progress.update(
pbar,
visible=leave_bar,
Expand All @@ -267,7 +258,7 @@ async def _upload_singlepart(
async def _upload_multipart(
session: aiohttp.ClientSession,
response: Dict,
filepath: str,
file: File,
dataverse_url: str,
pbar,
progress,
Expand All @@ -279,7 +270,7 @@ async def _upload_multipart(
Args:
session (aiohttp.ClientSession): The aiohttp client session.
response (Dict): The response from the Dataverse API containing the upload ticket information.
filepath (str): The path to the file to be uploaded.
file (File): The file object to be uploaded.
dataverse_url (str): The URL of the Dataverse instance.
pbar (tqdm): A progress bar to track the upload progress.
progress: The progress callback function.
Expand All @@ -301,19 +292,20 @@ async def _upload_multipart(

try:
e_tags = await _chunked_upload(
filepath=filepath,
file=file,
session=session,
urls=urls,
chunk_size=chunk_size,
pbar=pbar,
progress=progress,
)
except Exception as e:
print(f"❌ Failed to upload file '{filepath}' to the S3 storage")
print(f"❌ Failed to upload file '{file.file_name}' to the S3 storage")
await _abort_upload(
session=session,
url=abort,
dataverse_url=dataverse_url,
api_token=api_token,
)
raise e

Expand All @@ -329,7 +321,7 @@ async def _upload_multipart(


async def _chunked_upload(
filepath: str,
file: File,
session: aiohttp.ClientSession,
urls,
chunk_size: int,
Expand All @@ -340,7 +332,7 @@ async def _chunked_upload(
Uploads a file in chunks to multiple URLs using the provided session.
Args:
filepath (str): The path of the file to upload.
file (File): The file object to upload.
session (aiohttp.ClientSession): The aiohttp client session to use for the upload.
urls: An iterable of URLs to upload the file chunks to.
chunk_size (int): The size of each chunk in bytes.
Expand All @@ -351,7 +343,17 @@ async def _chunked_upload(
List[str]: A list of ETags returned by the server for each uploaded chunk.
"""
e_tags = []
async with aiofiles.open(filepath, "rb") as f:

if not os.path.exists(file.filepath):
raise NotImplementedError(
"""
Multipart chunked upload is currently only supported for local files and no in-memory objects.
Please save the handlers content to a local file and try again.
"""
)

async with aiofiles.open(file.filepath, "rb") as f:
chunk = await f.read(chunk_size)
e_tags.append(
await _upload_chunk(
Expand Down Expand Up @@ -459,6 +461,7 @@ async def _abort_upload(
session: aiohttp.ClientSession,
url: str,
dataverse_url: str,
api_token: str,
):
"""
Aborts an ongoing upload by sending a DELETE request to the specified URL.
Expand All @@ -467,11 +470,17 @@ async def _abort_upload(
session (aiohttp.ClientSession): The aiohttp client session.
url (str): The URL to send the DELETE request to.
dataverse_url (str): The base URL of the Dataverse instance.
api_token (str): The API token to use for the request.
Raises:
aiohttp.ClientResponseError: If the DELETE request fails.
"""
async with session.delete(urljoin(dataverse_url, url)) as response:

headers = {
"X-Dataverse-key": api_token,
}

async with session.delete(urljoin(dataverse_url, url), headers=headers) as response:
response.raise_for_status()


Expand Down Expand Up @@ -563,6 +572,7 @@ async def _multipart_json_data_request(
Returns:
None
"""

with aiohttp.MultipartWriter("form-data") as writer:
json_part = writer.append(json_data)
json_part.set_content_disposition("form-data", name="jsonData")
Expand Down
Loading

0 comments on commit eda8180

Please sign in to comment.