Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer abstraction + multipart upload abstraction #587

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions invenio_records_resources/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

"""Invenio Records Resources module to create REST APIs."""


from . import config
from .registry import NotificationRegistry, ServiceRegistry

Expand All @@ -22,13 +23,32 @@ def __init__(self, app=None):

def init_app(self, app):
"""Flask application initialization."""

# imported here to preveent circular import inside .services module
from .services.files.transfer.registry import TransferRegistry

self.init_config(app)
self.registry = ServiceRegistry()
self.notification_registry = NotificationRegistry()
self.transfer_registry = TransferRegistry()
self.register_builtin_transfers()
app.extensions["invenio-records-resources"] = self

def init_config(self, app):
"""Initialize configuration."""
for k in dir(config):
if k.startswith("RECORDS_RESOURCES_") or k.startswith("SITE_"):
app.config.setdefault(k, getattr(config, k))

def register_builtin_transfers(self):
from .services.files.transfer import (
FetchTransfer,
LocalTransfer,
MultipartTransfer,
RemoteTransfer,
)

self.transfer_registry.register(LocalTransfer)
self.transfer_registry.register(FetchTransfer)
self.transfer_registry.register(RemoteTransfer)
self.transfer_registry.register(MultipartTransfer)
4 changes: 4 additions & 0 deletions invenio_records_resources/proxies.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@
lambda: current_app.extensions["invenio-records-resources"].notification_registry
)
"""Helper proxy to get the current notifications registry."""

current_transfer_registry = LocalProxy(
lambda: current_app.extensions["invenio-records-resources"].transfer_registry
)
1 change: 0 additions & 1 deletion invenio_records_resources/records/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import uuid

from invenio_db import db
from invenio_pidstore.errors import PIDDeletedError
from invenio_pidstore.models import PersistentIdentifier, PIDStatus


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

"""Systemfield for managing referenced entities in request."""

from functools import partial

from invenio_records.systemfields import SystemField

Expand Down
19 changes: 17 additions & 2 deletions invenio_records_resources/records/systemfields/files/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,29 @@ def unlock(self):

# TODO: "create" and "update" should be merged somehow...
@ensure_enabled
def create(self, key, obj=None, stream=None, data=None, **kwargs):
def create(
self,
key,
obj=None,
stream=None,
data=None,
transfer_type=None,
transfer_metadata=None,
**kwargs,
):
"""Create/initialize a file."""
assert not (obj and stream)

if key in self:
raise InvalidKeyError(description=f"File with key {key} already exists.")

rf = self.file_cls.create({}, key=key, record_id=self.record.id)
rf = self.file_cls.create(
{},
key=key,
record_id=self.record.id,
transfer={**(transfer_metadata or {}), "transfer_type": transfer_type},
)

if stream:
obj = ObjectVersion.create(self.bucket, key, stream=stream, **kwargs)
if obj:
Expand Down
1 change: 1 addition & 0 deletions invenio_records_resources/resources/files/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class FileResourceConfig(ResourceConfig):
"list": "/files",
"item": "/files/<path:key>",
"item-content": "/files/<path:key>/content",
"item-multipart-content": "/files/<path:key>/content/<int:part>",
"item-commit": "/files/<path:key>/commit",
"list-archive": "/files-archive",
}
43 changes: 42 additions & 1 deletion invenio_records_resources/resources/files/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from contextlib import ExitStack

import marshmallow as ma
from flask import Response, abort, current_app, g, stream_with_context
from flask import Response, current_app, g, stream_with_context
from flask_resources import (
JSONDeserializer,
RequestBodyParser,
Expand Down Expand Up @@ -50,6 +50,15 @@
default_content_type="application/octet-stream",
)

request_multipart_args = request_parser(
{
"pid_value": ma.fields.Str(required=True),
"key": ma.fields.Str(),
"part": ma.fields.Int(),
},
location="view_args",
)


#
# Resource
Expand Down Expand Up @@ -84,6 +93,16 @@ def create_url_rules(self):
route("POST", routes["item-commit"], self.create_commit),
route("PUT", routes["item-content"], self.update_content),
]
if "item-multipart-content" in routes:
# if there are multipart urls in routes, add them here. Currently RDM
# records do not have them.
url_rules += [
route(
"PUT",
routes["item-multipart-content"],
self.update_multipart_content,
),
]
return url_rules

@request_view_args
Expand Down Expand Up @@ -236,3 +255,25 @@ def update_content(self):
)

return item.to_dict(), 200

@request_multipart_args
@request_stream
@response_handler()
def update_multipart_content(self):
"""Upload file content."""
item = self.service.set_multipart_file_content(
g.identity,
resource_requestctx.view_args["pid_value"],
resource_requestctx.view_args["key"],
resource_requestctx.view_args["part"],
resource_requestctx.data["request_stream"],
content_length=resource_requestctx.data["request_content_length"],
)

# if errors are set then there was a `TransferException` raised
if item.to_dict().get("errors"):
raise FailedFileUploadException(
file_key=item.file_id, recid=item.id, file=item.to_dict()
)

return item.to_dict(), 200
2 changes: 0 additions & 2 deletions invenio_records_resources/services/custom_fields/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@ def __init__(self, name, field_args=None):
@abstractproperty
def mapping(self):
"""Return the mapping."""
pass

@property
@abstractproperty
def field(self):
"""Marshmallow field for custom fields."""
pass

@property
def ui_field(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
from .base import FileServiceComponent
from .content import FileContentComponent
from .metadata import FileMetadataComponent
from .multipart import FileMultipartContentComponent
from .processor import FileProcessorComponent

__all__ = (
"FileContentComponent",
"FileMetadataComponent",
"FileProcessorComponent",
"FileServiceComponent",
"FileMultipartContentComponent",
)
11 changes: 10 additions & 1 deletion invenio_records_resources/services/files/components/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,13 @@ def set_file_content(self, identity, id_, file_key, stream, content_length, reco

def get_file_content(self, identity, id_, file_key, record):
"""Get file content handler."""
pass

def get_file_transfer_metadata(
self, identity, id, file_key, record, transfer_metadata
):
"""Get file transfer metadata handler."""

def update_file_transfer_metadata(
self, identity, id, file_key, record, transfer_metadata
):
"""Update file transfer metadata handler."""
11 changes: 5 additions & 6 deletions invenio_records_resources/services/files/components/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
# details.

"""Files service components."""
from ....proxies import current_transfer_registry
from ...errors import FailedFileUploadException, TransferException
from ..transfer import Transfer
from .base import FileServiceComponent


Expand All @@ -23,12 +23,11 @@ def set_file_content(self, identity, id, file_key, stream, content_length, recor
if file_record is None:
raise Exception(f'File with key "{file_key}" has not been initialized yet.')

file_type = file_record.file.storage_class if file_record.file else None
transfer = Transfer.get_transfer(file_type)
transfer = current_transfer_registry.get_transfer(
record=record, file_record=file_record, service=self.service, uow=self.uow
)
try:
transfer.set_file_content(
record, file_record.file, file_key, stream, content_length
)
transfer.set_file_content(stream, content_length)
except TransferException as e:
failed = record.files.delete(file_key, softdelete_obj=False, remove_rf=True)
raise FailedFileUploadException(
Expand Down
113 changes: 102 additions & 11 deletions invenio_records_resources/services/files/components/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@

from copy import deepcopy

import marshmallow as ma
from flask import current_app
from flask_babel import gettext as _
from invenio_files_rest.errors import FileSizeError

from ....proxies import current_transfer_registry
from ...errors import FilesCountExceededException
from ..transfer import Transfer
from ...uow import RecordCommitOp
from .base import FileServiceComponent


Expand All @@ -20,8 +26,29 @@ class FileMetadataComponent(FileServiceComponent):

def init_files(self, identity, id, record, data):
"""Init files handler."""
schema = self.service.file_schema.schema(many=True)
validated_data = schema.load(data)

validated_data = []
if not isinstance(data, list):
raise ma.ValidationError("Expected a list of files.")

for idx, file_metadata in enumerate(data):
transfer_type = file_metadata.get("transfer_type", None)

schema_cls = self.get_transfer_type_schema(transfer_type)

schema = schema_cls()

try:
validated_data.append(schema.load(file_metadata))
except ma.ValidationError as e:
# add index to the error
raise ma.ValidationError(
e.messages_dict,
field_name=idx,
data=e.data,
valid_data=e.valid_data,
**e.kwargs,
)

# All brand-new drafts don't allow exceeding files limit (while added via rest API).
# Old records that already had more files than limited can continue adding files.
Expand All @@ -35,13 +62,51 @@ def init_files(self, identity, id, record, data):
max_files=maxFiles, resulting_files_count=resulting_files_count
)

for file_data in validated_data:
copy_fdata = deepcopy(file_data)
file_type = copy_fdata.pop("storage_class", None)
transfer = Transfer.get_transfer(
file_type, service=self.service, uow=self.uow
for file_metadata in validated_data:
temporary_obj = deepcopy(file_metadata)
transfer_type = temporary_obj.pop("transfer_type", None)

transfer = current_transfer_registry.get_transfer(
transfer_type=transfer_type,
record=record,
service=self.service,
uow=self.uow,
)

_ = transfer.init_file(record, temporary_obj)

def get_transfer_type_schema(self, transfer_type):
"""
Get the transfer type schema. If the transfer type is not provided, the default schema is returned.
If the transfer type is provided, the schema is created dynamically as a union of the default schema
and the transfer type schema.

Implementation details:
For performance reasons, the schema is cached in the service config under "_file_transfer_schemas" key.
"""
schema_cls = self.service.file_schema.schema
if not transfer_type:
return schema_cls

if not hasattr(self.service.config, "_file_transfer_schemas"):
self.service.config._file_transfer_schemas = {}

# have a look in the cache
if transfer_type in self.service.config._file_transfer_schemas:
return self.service.config._file_transfer_schemas[transfer_type]

# not there, create a subclass and put to the cache
transfer = current_transfer_registry.get_transfer(
transfer_type=transfer_type,
)
if transfer.Schema:
schema_cls = type(
f"{schema_cls.__name__}Transfer{transfer_type}",
(transfer.Schema, schema_cls),
{},
)
_ = transfer.init_file(record, copy_fdata)
self.service.config._file_transfer_schemas[transfer_type] = schema_cls
return schema_cls

def update_file_metadata(self, identity, id, file_key, record, data):
"""Update file metadata handler."""
Expand All @@ -54,7 +119,33 @@ def update_file_metadata(self, identity, id, file_key, record, data):
validated_data = schema.load(data)
record.files.update(file_key, data=validated_data)

# TODO: `commit_file` might vary based on your storage backend (e.g. S3)
def update_transfer_metadata(
self, identity, id, file_key, record, transfer_metadata
):
"""Update file transfer metadata handler."""
file = record.files[file_key]

file.transfer.set(transfer_metadata)
self.uow.register(RecordCommitOp(file))

def commit_file(self, identity, id, file_key, record):
"""Commit file handler."""
Transfer.commit_file(record, file_key)

transfer = current_transfer_registry.get_transfer(
record=record,
file_record=record.files.get(file_key),
service=self.service,
uow=self.uow,
)

transfer.commit_file()

f_obj = record.files.get(file_key)
f_inst = getattr(f_obj, "file", None)
file_size = getattr(f_inst, "size", None)
if file_size == 0:
allow_empty_files = current_app.config.get(
"RECORDS_RESOURCES_ALLOW_EMPTY_FILES", True
)
if not allow_empty_files:
raise FileSizeError(description=_("Empty files are not accepted."))
Loading