diff --git a/invenio_records_resources/services/records/service.py b/invenio_records_resources/services/records/service.py index 1d816b51..b76f3ca8 100644 --- a/invenio_records_resources/services/records/service.py +++ b/invenio_records_resources/services/records/service.py @@ -17,6 +17,7 @@ from invenio_search import current_search_client from invenio_search.engine import dsl from kombu import Queue +from marshmallow import ValidationError from werkzeug.local import LocalProxy from invenio_records_resources.services.errors import ( @@ -26,8 +27,9 @@ from ..base import LinksTemplate, Service from ..errors import RevisionIdMismatchError -from ..uow import RecordCommitOp, RecordDeleteOp, unit_of_work +from ..uow import RecordBulkCommitOp, RecordCommitOp, RecordDeleteOp, unit_of_work from .schema import ServiceSchemaWrapper +from sqlalchemy.orm.exc import NoResultFound class RecordIndexerMixin: @@ -579,3 +581,65 @@ def on_relation_update( self.reindex(identity, search_query=search_query) return True + + + @unit_of_work() + def create_update_many(self, identity, data, revision_id=None, uow=None, expand=False): + """Replace many records.""" + records_to_update = [] + records_to_create = [] + errors = [] + for record_id, record_data in data: + try: + record = self.record_cls.pid.resolve(record_id) + self.check_revision_id(record, revision_id) + + # Permissions + self.require_permission(identity, "update", record=record) + try: + record_data, _ = self.schema.load( + record_data, context=dict(identity=identity, pid=record.pid, record=record) + ) + except ValidationError as err: + errors.append(dict(record=record_data, errors=err.messages)) + continue + # TODO add other error handling when adding tests + + # Run components + self.run_components("update", identity, data=record_data, record=record, uow=uow) + # Update + + records_to_update.append(record) + except NoResultFound: + try: + self.require_permission(identity, "create") + + # Validate data and create record with pid + record_data, schema_errors = self.schema.load( + record_data, + context={"identity": identity} + ) + except ValidationError as err: + errors.append(dict(record=record_data, errors=err.messages)) + continue + # TODO add other error handling when adding tests + + # It's the components who saves the actual data in the record. + record = self.record_cls.create({}) + + # Run components + self.run_components( + "create", + identity, + data=record_data, + record=record, + errors=schema_errors, + uow=uow, + ) + + records_to_create.append(record) + + records = records_to_update + records_to_create + uow.register(RecordBulkCommitOp(records, self.indexer)) + + return records, errors diff --git a/invenio_records_resources/services/uow.py b/invenio_records_resources/services/uow.py index 32b5e3be..72dc267c 100644 --- a/invenio_records_resources/services/uow.py +++ b/invenio_records_resources/services/uow.py @@ -182,6 +182,29 @@ def on_commit(self, uow): arguments = {"refresh": True} if self._index_refresh else {} self._indexer.index(self._record, arguments=arguments) +class RecordBulkCommitOp(Operation): + """Record bulk commit operation with indexing.""" + + def __init__(self, records, indexer=None, index_refresh=False): + """Initialize the bulk record commit operation.""" + self._records = records + self._indexer = indexer + self._index_refresh = index_refresh + + + def on_register(self, uow): + """Save objects to the session.""" + for record in self._records: + json = record._validate() + record.model.json = json + uow.session.merge(record.model) + # TODO find a better way to save the record + + def on_commit(self, uow): + """Run the operation.""" + if self._indexer is not None: + record_ids = [record.id for record in self._records] + self._indexer.bulk_index(record_ids) class RecordIndexOp(RecordCommitOp): """Record indexing operation."""