Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Transfer registry #568

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions invenio_records_resources/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from . import config
from .registry import NotificationRegistry, ServiceRegistry
from .services.files.transfer.registry import TransferRegistry


class InvenioRecordsResources(object):
Expand All @@ -25,10 +26,21 @@ def init_app(self, app):
self.init_config(app)
self.registry = ServiceRegistry()
self.notification_registry = NotificationRegistry()
self.transfer_registry = TransferRegistry()
self.register_builtin_transfers()
app.extensions["invenio-records-resources"] = self

def init_config(self, app):
"""Initialize configuration."""
for k in dir(config):
if k.startswith("RECORDS_RESOURCES_") or k.startswith("SITE_"):
app.config.setdefault(k, getattr(config, k))

def register_builtin_transfers(self):
from invenio_records_resources.services.files.transfer import (
FetchTransfer,
LocalTransfer,
)

self.transfer_registry.register(LocalTransfer)
self.transfer_registry.register(FetchTransfer)
4 changes: 4 additions & 0 deletions invenio_records_resources/proxies.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@
lambda: current_app.extensions["invenio-records-resources"].notification_registry
)
"""Helper proxy to get the current notifications registry."""

current_transfer_registry = LocalProxy(
lambda: current_app.extensions["invenio-records-resources"].transfer_registry
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

"""Files service components."""
from ...errors import FailedFileUploadException, TransferException
from ..transfer import Transfer
from .base import FileServiceComponent
from ....proxies import current_transfer_registry


class FileContentComponent(FileServiceComponent):
Expand All @@ -23,8 +23,7 @@ def set_file_content(self, identity, id, file_key, stream, content_length, recor
if file_record is None:
raise Exception(f'File with key "{file_key}" has not been initialized yet.')

file_type = file_record.file.storage_class if file_record.file else None
transfer = Transfer.get_transfer(file_type)
transfer = current_transfer_registry.get_transfer(file_record=file_record)
try:
transfer.set_file_content(
record, file_record.file, file_key, stream, content_length
Expand Down
37 changes: 30 additions & 7 deletions invenio_records_resources/services/files/components/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@

from copy import deepcopy

from flask_babel import gettext as _
from flask import current_app
from invenio_files_rest.errors import FileSizeError

from ...errors import FilesCountExceededException
from ..schema import InitFileSchema
from ..transfer import Transfer
from .base import FileServiceComponent
from ....proxies import current_transfer_registry


class FileMetadataComponent(FileServiceComponent):
Expand All @@ -38,18 +42,37 @@ def init_files(self, identity, id, record, data):

for file_metadata in validated_data:
temporary_obj = deepcopy(file_metadata)
file_type = temporary_obj.pop("storage_class", None)
transfer = Transfer.get_transfer(
file_type, service=self.service, uow=self.uow
)
transfer_type = temporary_obj.pop("storage_class", None)

transfer = current_transfer_registry.get_transfer(
transfer_type=transfer_type,
service=self.service,
uow=self.uow)

_ = transfer.init_file(record, temporary_obj)

def update_file_metadata(self, identity, id, file_key, record, data):
"""Update file metadata handler."""
# FIXME: move this call to a transfer call
record.files.update(file_key, data=data)

# TODO: `commit_file` might vary based on your storage backend (e.g. S3)
def commit_file(self, identity, id, file_key, record):
"""Commit file handler."""
Transfer.commit_file(record, file_key)

transfer = current_transfer_registry.get_transfer(
record=record,
file_record=record.files.get(file_key),
service=self.service,
uow=self.uow)

transfer.commit_file()

f_obj = record.files.get(file_key)
f_inst = getattr(f_obj, "file", None)
file_size = getattr(f_inst, "size", None)
if file_size == 0:
allow_empty_files = current_app.config.get(
"RECORDS_RESOURCES_ALLOW_EMPTY_FILES", True
)
if not allow_empty_files:
raise FileSizeError(description=_("Empty files are not accepted."))
6 changes: 3 additions & 3 deletions invenio_records_resources/services/files/generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from invenio_records_permissions.generators import Generator
from invenio_search.engine import dsl

from .transfer import TransferType
from .transfer import LOCAL_TRANSFER_TYPE


class AnyUserIfFileIsLocal(Generator):
Expand All @@ -28,12 +28,12 @@ def needs(self, **kwargs):
file_record = record.files.get(file_key)
# file_record __bool__ returns false for `if file_record`
file = file_record.file if file_record is not None else None
is_file_local = not file or file.storage_class == TransferType.LOCAL
is_file_local = not file or file.storage_class == LOCAL_TRANSFER_TYPE
else:
file_records = record.files.entries
for file_record in file_records:
file = file_record.file
if file and file.storage_class != TransferType.LOCAL:
if file and file.storage_class != LOCAL_TRANSFER_TYPE:
is_file_local = False
break

Expand Down
17 changes: 16 additions & 1 deletion invenio_records_resources/services/files/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from ..base import ServiceListResult
from ..records.results import RecordItem
from ...proxies import current_transfer_registry


class FileItem(RecordItem):
Expand Down Expand Up @@ -92,8 +93,22 @@ def entries(self):
identity=self._identity,
),
)

# create links
if self._links_item_tpl:
projection["links"] = self._links_item_tpl.expand(self._identity, entry)
links = self._links_item_tpl.expand(self._identity, entry)
else:
links = {}

# add transfer links
transfer = current_transfer_registry.get_transfer(file_record=entry)
for k, v in transfer.expand_links(self._identity, entry).items():
if v is not None:
links[k] = v
else:
links.pop(k, None)

projection["links"] = links

yield projection

Expand Down
69 changes: 28 additions & 41 deletions invenio_records_resources/services/files/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# details.

"""File schema."""

import typing
from datetime import timezone
from urllib.parse import urlparse

Expand All @@ -19,13 +19,17 @@
Schema,
ValidationError,
pre_dump,
post_dump,
validate,
validates,
)
from marshmallow.fields import UUID, Dict, Integer, Str
from marshmallow.schema import _T
from marshmallow.utils import missing
from marshmallow_utils.fields import GenMethod, Links, SanitizedUnicode, TZDateTime

from .transfer import TransferType
from .transfer import BaseTransfer
from ...proxies import current_transfer_registry


class InitFileSchema(Schema):
Expand Down Expand Up @@ -73,62 +77,45 @@ def validate_names(self, value):
if domain not in allowed_domains:
raise ValidationError("Domain not allowed", field_name="uri")

@pre_dump(pass_many=False)
def fields_from_file_obj(self, data, **kwargs):
"""Fields coming from the FileInstance model."""
# this cannot be implemented as fields.Method since those receive the already
# dumped data. it could not be access to data.file.
# using data_key and attribute from marshmallow did not work as expected.

# data is a FileRecord instance, might not have a file yet.
# data.file is a File wrapper object.
if data.file:
# mandatory fields
data["storage_class"] = data.file.storage_class
data["uri"] = data.file.uri

# If Local -> remove uri as it contains internal file storage info
if not TransferType(data["storage_class"]).is_serializable():
data.pop("uri")

# optional fields
fields = ["checksum", "size"]
for field in fields:
value = getattr(data.file, field, None)
if value is not None:
data[field] = value

return data
def dump(self, obj: typing.Any, *, many = None, **kwargs):
raise Exception("InitFileSchema should not be used for dumping.")


class FileSchema(InitFileSchema):
class FileSchema(Schema):
"""Service schema for files."""

class Meta:
"""Meta."""

unknown = RAISE

key = Str(required=True)

created = TZDateTime(timezone=timezone.utc, format="iso", dump_only=True)
updated = TZDateTime(timezone=timezone.utc, format="iso", dump_only=True)

status = GenMethod("dump_status")
metadata = Dict(dump_only=True)
mimetype = Str(dump_only=True, attribute="file.mimetype")
checksum = Str(dump_only=True, attribute="file.checksum")
size = Integer(dump_only=True, attribute="file.size")

storage_class = Str(dump_only=True, attribute="file.storage_class")

version_id = UUID(attribute="file.version_id")
file_id = UUID(attribute="file.file_id")
bucket_id = UUID(attribute="file.bucket_id")

links = Links()

def dump_status(self, obj):
"""Dump file status."""
# due to time constraints the status check is done here
# however, ideally this class should not need knowledge of
# the TransferType class, it should be encapsulated at File
# wrapper class or lower.
has_file = obj.file is not None
if has_file and TransferType(obj.file.storage_class).is_completed:
return "completed"

return "pending"
# comes from transfer_data
# status = Str()
# uri = Str()

@post_dump(pass_many=False, pass_original=True)
def _dump_transfer_data(self, data, original_data, **kwargs):
"""
Enriches the dumped data with the transfer data.
"""
transfer = current_transfer_registry.get_transfer(file_record=original_data)
data |= transfer.transfer_data
return data
Loading