diff --git a/invenio_records_resources/services/base/config.py b/invenio_records_resources/services/base/config.py index ebedd732..fea60535 100644 --- a/invenio_records_resources/services/base/config.py +++ b/invenio_records_resources/services/base/config.py @@ -12,7 +12,12 @@ from invenio_base.utils import load_or_import_from_config from invenio_records_permissions.policies import BasePermissionPolicy -from .results import ServiceItemResult, ServiceListResult +from .results import ( + ServiceBulkItemResult, + ServiceBulkListResult, + ServiceItemResult, + ServiceListResult, +) # # Service @@ -27,6 +32,8 @@ class ServiceConfig: permission_policy_cls = BasePermissionPolicy result_item_cls = ServiceItemResult result_list_cls = ServiceListResult + result_bulk_item_cls = ServiceBulkItemResult + result_bulk_list_cls = ServiceBulkListResult # diff --git a/invenio_records_resources/services/base/results.py b/invenio_records_resources/services/base/results.py index a7ad9214..14334eee 100644 --- a/invenio_records_resources/services/base/results.py +++ b/invenio_records_resources/services/base/results.py @@ -20,3 +20,11 @@ class ServiceItemResult(ServiceResult): class ServiceListResult(ServiceResult): """Base class for a service result for a list of items.""" + + +class ServiceBulkItemResult(ServiceResult): + """Base class for a service result for a single item performed on a bulk operation.""" + + +class ServiceBulkListResult(ServiceResult): + """Base class for a service result for a list of items performed on a bulk operation.""" diff --git a/invenio_records_resources/services/base/service.py b/invenio_records_resources/services/base/service.py index e5f877d8..9ec73277 100644 --- a/invenio_records_resources/services/base/service.py +++ b/invenio_records_resources/services/base/service.py @@ -99,3 +99,21 @@ def result_list(self, *args, **kwargs): a Service must provide. """ return self.config.result_list_cls(*args, **kwargs) + + def result_bulk_item(self, *args, **kwargs): + """Create a new instance of the bulk resource unit. + + A bulk resource unit is an instantiated object representing one unit + of a Resource. It is what a bulk Resource unit methods transact in + and therefore what a Service must provide. + """ + return self.config.result_bulk_item_cls(*args, **kwargs) + + def result_bulk_list(self, *args, **kwargs): + """Create a new instance of the bulk resource list. + + A bulk resource list is an instantiated object representing a grouping + of Resource units. It is what a bulk Resource list methods transact in + and therefore what a Service must provide. + """ + return self.config.result_bulk_list_cls(*args, **kwargs) diff --git a/invenio_records_resources/services/records/config.py b/invenio_records_resources/services/records/config.py index c24d8f12..995a74cb 100644 --- a/invenio_records_resources/services/records/config.py +++ b/invenio_records_resources/services/records/config.py @@ -20,7 +20,7 @@ from .components import MetadataComponent from .links import RecordLink, pagination_links from .params import FacetsParam, PaginationParam, QueryParser, QueryStrParam, SortParam -from .results import RecordItem, RecordList +from .results import RecordBulkItem, RecordBulkList, RecordItem, RecordList class SearchOptions: @@ -58,6 +58,8 @@ class RecordServiceConfig(ServiceConfig): permission_policy_cls = RecordPermissionPolicy result_item_cls = RecordItem result_list_cls = RecordList + result_bulk_item_cls = RecordBulkItem + result_bulk_list_cls = RecordBulkList # Record specific configuration record_cls = Record diff --git a/invenio_records_resources/services/records/results.py b/invenio_records_resources/services/records/results.py index 82ab4e33..be0b3b76 100644 --- a/invenio_records_resources/services/records/results.py +++ b/invenio_records_resources/services/records/results.py @@ -14,6 +14,11 @@ from invenio_access.permissions import system_user_id from invenio_records.dictutils import dict_lookup, dict_merge, dict_set +from invenio_records_resources.services.base.results import ( + ServiceBulkItemResult, + ServiceBulkListResult, +) + from ...pagination import Pagination from ..base import ServiceItemResult, ServiceListResult @@ -254,6 +259,62 @@ def to_dict(self): return res +class RecordBulkItem(ServiceBulkItemResult): + """Record bulk item.""" + + def __init__(self, op_type, record, errors, exc): + """Constructor.""" + self._op_type = op_type + self._record = record + self._errors = errors + self._exc = exc + + @property + def op_type(self): + """Get operation type.""" + return self._op_type + + @property + def record(self): + """Get record.""" + return self._record + + @property + def errors(self): + """Get errors.""" + return self._errors + + @property + def exc(self): + """Get exception.""" + return self._exc + + +class RecordBulkList(ServiceBulkListResult): + """List of records result.""" + + def __init__( + self, + service, + identity, + results, + ): + """Constructor. + + :params service: a service instance + :params identity: an identity that performed the service request + :params results: the results of the bulk operation + """ + self._identity = identity + self._service = service + self._results = [RecordBulkItem(*r) for r in results] + + @property + def results(self): + """Iterator over the results.""" + return iter(self._results) + + class ExpandableField(ABC): """Field referencing to another record that can be expanded.""" diff --git a/invenio_records_resources/services/records/service.py b/invenio_records_resources/services/records/service.py index e39ef138..98bd3a84 100644 --- a/invenio_records_resources/services/records/service.py +++ b/invenio_records_resources/services/records/service.py @@ -591,63 +591,75 @@ def create_or_update_many(self, identity, data, uow=None): :param identity: The user identity performing the operation. :param data: A list of tuples containing the record ID and record data. :param uow: The unit of work to register the record operations. Defaults to None. - - Returns: - list: A list of tuples containing the operation type ('create' or 'update'), the processed record or the record dict, and any schema errors encountered. """ records_processed = [] - for record_id, record_dict in data: - try: - record = self.record_cls.pid.resolve(record_id) - # Permissions - self.require_permission(identity, "update", record=record) - record_data, schema_errors = self.schema.load( - record_dict, - context=dict(identity=identity, pid=record.pid, record=record), - raise_errors=False, - ) + def _create_record(record_dict, record): + """Create a record.""" + # Permissions + self.require_permission(identity, "update", record=record) + record_data, schema_errors = self.schema.load( + record_dict, + context=dict(identity=identity, pid=record.pid, record=record), + raise_errors=False, + ) - # If errors we avoid creating the record - if schema_errors: - records_processed.append(("update", record_dict, schema_errors)) - continue + # If errors we avoid creating the record + if schema_errors: + records_processed.append(("update", record_dict, schema_errors, None)) + return - # Run components - self.run_components( - "update", identity, data=record_data, record=record, uow=uow - ) + # Run components + self.run_components( + "update", identity, data=record_data, record=record, uow=uow + ) - records_processed.append(("update", record, schema_errors)) - except PIDDoesNotExistError: - self.require_permission(identity, "create") + records_processed.append(("update", record, schema_errors, None)) - # Validate data and create record with pid - record_data, schema_errors = self.schema.load( - record_dict, context={"identity": identity}, raise_errors=False - ) + def _update_record(record_dict): + """Update a record.""" + self.require_permission(identity, "create") - # If errors we avoid creating the record - if schema_errors: - records_processed.append(("create", record_dict, schema_errors)) - continue - - # It's the components who saves the actual data in the record. - record = self.record_cls.create({}) - - # Run components - self.run_components( - "create", - identity, - data=record_data, - record=record, - errors=schema_errors, - uow=uow, - ) - records_processed.append(("create", record, schema_errors)) + # Validate data and create record with pid + record_data, schema_errors = self.schema.load( + record_dict, context={"identity": identity}, raise_errors=False + ) + + # If errors we avoid creating the record + if schema_errors: + records_processed.append(("create", record_dict, schema_errors, None)) + return + + # It's the components who saves the actual data in the record. + record = self.record_cls.create({}) + + # Run components + self.run_components( + "create", + identity, + data=record_data, + record=record, + errors=schema_errors, + uow=uow, + ) + records_processed.append(("create", record, schema_errors, None)) + + # We avoid using create and update methods to bulk index all records at once + for record_id, record_dict in data: + try: + record = self.record_cls.pid.resolve(record_id) + _create_record(record_dict, record) + except (NoResultFound, PIDDoesNotExistError): + _update_record(record_dict) + except Exception as exc: + records_processed.append(("create", record_dict, None, exc)) # We only commit records that have no errors - records = [record for _, record, errors in records_processed if errors == []] + records = [ + record + for _, record, errors, exc in records_processed + if errors == [] and exc is None + ] uow.register(RecordBulkCommitOp(records, self.indexer)) - return records_processed + return self.result_bulk_list(self, identity, records_processed) diff --git a/tests/services/test_service_create_update.py b/tests/services/test_service_create_update.py index e6ed480d..1feab33f 100644 --- a/tests/services/test_service_create_update.py +++ b/tests/services/test_service_create_update.py @@ -18,9 +18,13 @@ def test_create(app, service, identity_simple, input_data): """Create a record .""" data = [(None, input_data)] - records = service.create_or_update_many(identity_simple, data) - assert len(records) == 1 - op_type, record, errors = records[0] + result_list = service.create_or_update_many(identity_simple, data) + result_iterator = result_list.results + + result = next(result_iterator, None) + assert result is not None and next(result_iterator, None) is None + record, op_type, errors = result.record, result.op_type, result.errors + assert record.id assert errors == [] assert op_type == "create" @@ -35,9 +39,12 @@ def test_create(app, service, identity_simple, input_data): def test_create_multiple_records(app, service, identity_simple, input_data): """Create multiple records.""" data = [(None, input_data), (None, input_data)] - records = service.create_or_update_many(identity_simple, data) - assert len(records) == 2 - for op_type, record, errors in records: + result_list = service.create_or_update_many(identity_simple, data) + results = list(result_list.results) + assert len(results) == 2 + for record, op_type, errors in ( + (result.record, result.op_type, result.errors) for result in results + ): assert record.id assert errors == [] assert op_type == "create" @@ -57,9 +64,12 @@ def test_update_example_record(app, service, identity_simple, input_data): updated_data["metadata"]["title"] = "Updated Title" data = [(id_, updated_data)] - records = service.create_or_update_many(identity_simple, data) - assert len(records) == 1 - op_type, record, errors = records[0] + result_list = service.create_or_update_many(identity_simple, data) + result_iterator = result_list.results + result = next(result_iterator, None) + assert result is not None and next(result_iterator, None) is None + + record, op_type, errors = result.record, result.op_type, result.errors assert record.get("id") == id_ assert errors == [] assert op_type == "update" @@ -74,9 +84,12 @@ def test_create_and_update_mixed(app, service, identity_simple, input_data): updated_data["metadata"]["title"] = "Updated Title" data = [(id_, updated_data), (None, input_data)] - records = service.create_or_update_many(identity_simple, data) - assert len(records) == 2 - for op_type, record, errors in records: + result_list = service.create_or_update_many(identity_simple, data) + results = list(result_list.results) + assert len(results) == 2 + for record, op_type, errors in ( + (result.record, result.op_type, result.errors) for result in results + ): assert record.id assert errors == [] if op_type == "create": @@ -95,9 +108,12 @@ def test_create_with_validation_errors( ): """Create a record with validation errors.""" data = [(None, invalid_input_data)] - records = service.create_or_update_many(identity_simple, data) - assert len(records) == 1 - op_type, record, errors = records[0] + result_list = service.create_or_update_many(identity_simple, data) + result_iterator = result_list.results + result = next(result_iterator, None) + assert result is not None and next(result_iterator, None) is None + + record, op_type, errors = result.record, result.op_type, result.errors assert errors != [] assert op_type == "create" @@ -114,9 +130,12 @@ def test_update_with_validation_errors( id_ = item.id invalid_input_data["id"] = id_ data = [(id_, invalid_input_data)] - records = service.create_or_update_many(identity_simple, data) - assert len(records) == 1 - op_type, record, errors = records[0] + result_list = service.create_or_update_many(identity_simple, data) + result_iterator = result_list.results + result = next(result_iterator, None) + assert result is not None and next(result_iterator, None) is None + + record, op_type, errors = result.record, result.op_type, result.errors assert record.get("id") == id_ assert errors != [] assert op_type == "update" @@ -127,9 +146,10 @@ def test_multiple_records( ): """Create multiple records.""" data = [(None, input_data), (None, invalid_input_data)] - records = service.create_or_update_many(identity_simple, data) - assert len(records) == 2 - for op_type, record, errors in records: + result_list = service.create_or_update_many(identity_simple, data) + results = list(result_list.results) + assert len(results) == 2 + for record, errors in ((result.record, result.errors) for result in results): if errors: # Assert it failed to insert with pytest.raises(PIDDoesNotExistError):