Skip to content

Commit

Permalink
service: add create update many
Browse files Browse the repository at this point in the history
  • Loading branch information
jrcastro2 committed Jul 16, 2024
1 parent 6c484cb commit e086008
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 1 deletion.
66 changes: 65 additions & 1 deletion invenio_records_resources/services/records/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from invenio_search import current_search_client
from invenio_search.engine import dsl
from kombu import Queue
from marshmallow import ValidationError
from werkzeug.local import LocalProxy

from invenio_records_resources.services.errors import (
Expand All @@ -26,8 +27,9 @@

from ..base import LinksTemplate, Service
from ..errors import RevisionIdMismatchError
from ..uow import RecordCommitOp, RecordDeleteOp, unit_of_work
from ..uow import RecordBulkCommitOp, RecordCommitOp, RecordDeleteOp, unit_of_work
from .schema import ServiceSchemaWrapper
from sqlalchemy.orm.exc import NoResultFound


class RecordIndexerMixin:
Expand Down Expand Up @@ -579,3 +581,65 @@ def on_relation_update(

self.reindex(identity, search_query=search_query)
return True


@unit_of_work()
def create_update_many(self, identity, data, revision_id=None, uow=None, expand=False):
"""Replace many records."""
records_to_update = []
records_to_create = []
errors = []
for record_id, record_data in data:
try:
record = self.record_cls.pid.resolve(record_id)
self.check_revision_id(record, revision_id)

# Permissions
self.require_permission(identity, "update", record=record)
try:
record_data, _ = self.schema.load(
record_data, context=dict(identity=identity, pid=record.pid, record=record)
)
except ValidationError as err:
errors.append(dict(record=record_data, errors=err.messages))
continue
# TODO add other error handling when adding tests

# Run components
self.run_components("update", identity, data=record_data, record=record, uow=uow)
# Update

records_to_update.append(record)
except NoResultFound:
try:
self.require_permission(identity, "create")

# Validate data and create record with pid
record_data, schema_errors = self.schema.load(
record_data,
context={"identity": identity}
)
except ValidationError as err:
errors.append(dict(record=record_data, errors=err.messages))
continue
# TODO add other error handling when adding tests

# It's the components who saves the actual data in the record.
record = self.record_cls.create({})

# Run components
self.run_components(
"create",
identity,
data=record_data,
record=record,
errors=schema_errors,
uow=uow,
)

records_to_create.append(record)

records = records_to_update + records_to_create
uow.register(RecordBulkCommitOp(records, self.indexer))

return records, errors
23 changes: 23 additions & 0 deletions invenio_records_resources/services/uow.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,29 @@ def on_commit(self, uow):
arguments = {"refresh": True} if self._index_refresh else {}
self._indexer.index(self._record, arguments=arguments)

class RecordBulkCommitOp(Operation):
"""Record bulk commit operation with indexing."""

def __init__(self, records, indexer=None, index_refresh=False):
"""Initialize the bulk record commit operation."""
self._records = records
self._indexer = indexer
self._index_refresh = index_refresh


def on_register(self, uow):
"""Save objects to the session."""
for record in self._records:
json = record._validate()
record.model.json = json
uow.session.merge(record.model)
# TODO find a better way to save the record

def on_commit(self, uow):
"""Run the operation."""
if self._indexer is not None:
record_ids = [record.id for record in self._records]
self._indexer.bulk_index(record_ids)

class RecordIndexOp(RecordCommitOp):
"""Record indexing operation."""
Expand Down

0 comments on commit e086008

Please sign in to comment.