Skip to content

Commit

Permalink
service: add ServiceBulkListResult
Browse files Browse the repository at this point in the history
* Adds ServiceBulkListResult and ServiceBulkItemResult
  • Loading branch information
jrcastro2 authored and slint committed Aug 1, 2024
1 parent f34b86a commit 2ac95be
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 93 deletions.
9 changes: 8 additions & 1 deletion invenio_records_resources/services/base/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
from invenio_base.utils import load_or_import_from_config
from invenio_records_permissions.policies import BasePermissionPolicy

from .results import ServiceItemResult, ServiceListResult
from .results import (
ServiceBulkItemResult,
ServiceBulkListResult,
ServiceItemResult,
ServiceListResult,
)

#
# Service
Expand All @@ -27,6 +32,8 @@ class ServiceConfig:
permission_policy_cls = BasePermissionPolicy
result_item_cls = ServiceItemResult
result_list_cls = ServiceListResult
result_bulk_item_cls = ServiceBulkItemResult
result_bulk_list_cls = ServiceBulkListResult


#
Expand Down
8 changes: 8 additions & 0 deletions invenio_records_resources/services/base/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,11 @@ class ServiceItemResult(ServiceResult):

class ServiceListResult(ServiceResult):
"""Base class for a service result for a list of items."""


class ServiceBulkItemResult(ServiceResult):
"""Base class for a service result for a single item performed on a bulk operation."""


class ServiceBulkListResult(ServiceResult):
"""Base class for a service result for a list of items performed on a bulk operation."""
18 changes: 18 additions & 0 deletions invenio_records_resources/services/base/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,21 @@ def result_list(self, *args, **kwargs):
a Service must provide.
"""
return self.config.result_list_cls(*args, **kwargs)

def result_bulk_item(self, *args, **kwargs):
"""Create a new instance of the bulk resource unit.
A bulk resource unit is an instantiated object representing one unit
of a Resource. It is what a bulk Resource unit methods transact in
and therefore what a Service must provide.
"""
return self.config.result_bulk_item_cls(*args, **kwargs)

def result_bulk_list(self, *args, **kwargs):
"""Create a new instance of the bulk resource list.
A bulk resource list is an instantiated object representing a grouping
of Resource units. It is what a bulk Resource list methods transact in
and therefore what a Service must provide.
"""
return self.config.result_bulk_list_cls(*args, **kwargs)
4 changes: 3 additions & 1 deletion invenio_records_resources/services/records/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .components import MetadataComponent
from .links import RecordLink, pagination_links
from .params import FacetsParam, PaginationParam, QueryParser, QueryStrParam, SortParam
from .results import RecordItem, RecordList
from .results import RecordBulkItem, RecordBulkList, RecordItem, RecordList


class SearchOptions:
Expand Down Expand Up @@ -58,6 +58,8 @@ class RecordServiceConfig(ServiceConfig):
permission_policy_cls = RecordPermissionPolicy
result_item_cls = RecordItem
result_list_cls = RecordList
result_bulk_item_cls = RecordBulkItem
result_bulk_list_cls = RecordBulkList

# Record specific configuration
record_cls = Record
Expand Down
61 changes: 61 additions & 0 deletions invenio_records_resources/services/records/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
from invenio_access.permissions import system_user_id
from invenio_records.dictutils import dict_lookup, dict_merge, dict_set

from invenio_records_resources.services.base.results import (
ServiceBulkItemResult,
ServiceBulkListResult,
)

from ...pagination import Pagination
from ..base import ServiceItemResult, ServiceListResult

Expand Down Expand Up @@ -254,6 +259,62 @@ def to_dict(self):
return res


class RecordBulkItem(ServiceBulkItemResult):
"""Record bulk item."""

def __init__(self, op_type, record, errors, exc):
"""Constructor."""
self._op_type = op_type
self._record = record
self._errors = errors
self._exc = exc

@property
def op_type(self):
"""Get operation type."""
return self._op_type

@property
def record(self):
"""Get record."""
return self._record

@property
def errors(self):
"""Get errors."""
return self._errors

@property
def exc(self):
"""Get exception."""
return self._exc


class RecordBulkList(ServiceBulkListResult):
"""List of records result."""

def __init__(
self,
service,
identity,
results,
):
"""Constructor.
:params service: a service instance
:params identity: an identity that performed the service request
:params results: the results of the bulk operation
"""
self._identity = identity
self._service = service
self._results = [RecordBulkItem(*r) for r in results]

@property
def results(self):
"""Iterator over the results."""
return iter(self._results)


class ExpandableField(ABC):
"""Field referencing to another record that can be expanded."""

Expand Down
106 changes: 58 additions & 48 deletions invenio_records_resources/services/records/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,63 +591,73 @@ def create_or_update_many(self, identity, data, uow=None):
:param identity: The user identity performing the operation.
:param data: A list of tuples containing the record ID and record data.
:param uow: The unit of work to register the record operations. Defaults to None.
Returns:
list: A list of tuples containing the operation type ('create' or 'update'), the processed record or the record dict, and any schema errors encountered.
"""
records_processed = []
for record_id, record_dict in data:
try:
record = self.record_cls.pid.resolve(record_id)

# Permissions
self.require_permission(identity, "update", record=record)
record_data, schema_errors = self.schema.load(
record_dict,
context=dict(identity=identity, pid=record.pid, record=record),
raise_errors=False,
)
self.require_permission(identity, "create_or_update_many")

# If errors we avoid creating the record
if schema_errors:
records_processed.append(("update", record_dict, schema_errors))
continue
def _update_record(record_dict, record):
"""Update a record."""
record_data, schema_errors = self.schema.load(
record_dict,
context=dict(identity=identity, pid=record.pid, record=record),
raise_errors=False,
)

# Run components
self.run_components(
"update", identity, data=record_data, record=record, uow=uow
)
# If errors we avoid creating the record
if schema_errors:
records_processed.append(("update", record_dict, schema_errors, None))
return

records_processed.append(("update", record, schema_errors))
except PIDDoesNotExistError:
self.require_permission(identity, "create")
# Run components
self.run_components(
"update", identity, data=record_data, record=record, uow=uow
)

# Validate data and create record with pid
record_data, schema_errors = self.schema.load(
record_dict, context={"identity": identity}, raise_errors=False
)
records_processed.append(("update", record, schema_errors, None))

# If errors we avoid creating the record
if schema_errors:
records_processed.append(("create", record_dict, schema_errors))
continue

# It's the components who saves the actual data in the record.
record = self.record_cls.create({})

# Run components
self.run_components(
"create",
identity,
data=record_data,
record=record,
errors=schema_errors,
uow=uow,
)
records_processed.append(("create", record, schema_errors))
def _create_record(record_dict):
"""Create a record."""
# Validate data and create record with pid
record_data, schema_errors = self.schema.load(
record_dict, context={"identity": identity}, raise_errors=False
)

# If errors we avoid creating the record
if schema_errors:
records_processed.append(("create", record_dict, schema_errors, None))
return

# It's the components who saves the actual data in the record
record = self.record_cls.create({})

# Run components
self.run_components(
"create",
identity,
data=record_data,
record=record,
errors=schema_errors,
uow=uow,
)
records_processed.append(("create", record, schema_errors, None))

# We avoid using create and update methods to bulk index all records at once
for record_id, record_dict in data:
try:
record = self.record_cls.pid.resolve(record_id)
_update_record(record_dict, record)
except (NoResultFound, PIDDoesNotExistError):
_create_record(record_dict)
except Exception as exc:
records_processed.append(("create", record_dict, None, exc))

# We only commit records that have no errors
records = [record for _, record, errors in records_processed if errors == []]
records = [
record
for _, record, errors, exc in records_processed
if errors == [] and exc is None
]
uow.register(RecordBulkCommitOp(records, self.indexer))

return records_processed
return self.result_bulk_list(self, identity, records_processed)
Loading

0 comments on commit 2ac95be

Please sign in to comment.