Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODINVSTOR-1283 Modify endpoint for bulk instances upsert with publish events flag #1112

Merged
merged 4 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@
## v28.0.0 In progress
## v28.1.0 YYYY-mm-DD
### Breaking changes
* Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE))

### New APIs versions
* Provides `API_NAME vX.Y`
* Requires `API_NAME vX.Y`

### Features
* Modify endpoint for bulk instances upsert with publish events flag ([MODINVSTOR-1283](https://folio-org.atlassian.net/browse/MODINVSTOR-1283))

### Bug fixes
* Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE))

### Tech Dept
* Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE))

### Dependencies
* Bump `LIB_NAME` from `OLD_VERSION` to `NEW_VERSION`
* Add `LIB_NAME VERSION`
* Remove `LIB_NAME`

---

## v28.0.0 2024-11-01
### Breaking changes
* Migrate "publicationPeriod" data to the Dates object and remove it from the Instance schema ([MODINVSTOR-1232](https://folio-org.atlassian.net/browse/MODINVSTOR-1232))
* Delete deprecated `shelf-locations` API ([MODINVSTOR-1183](https://folio-org.atlassian.net/browse/MODINVSTOR-1183))
Expand Down
5 changes: 5 additions & 0 deletions ramls/bulkUpsertRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
"recordsFileName": {
"description": "File name of entities records",
"type": "string"
},
"publishEvents": {
"description": "A flag that indicates whether domain events should be published.",
"type": "boolean",
"default": true
}
},
"required": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void postInstanceStorageBatchSynchronous(boolean upsert, InstancesPost en
var instances = ObjectConverterUtils.convertObject(entity, Instances.class);

new InstanceService(vertxContext, okapiHeaders)
.createInstances(instances.getInstances(), upsert, true)
.createInstances(instances.getInstances(), upsert, true, true)
.otherwise(cause -> respond500WithTextPlain(cause.getMessage()))
.onComplete(asyncResultHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void postInstanceStorageBatchSynchronousUnsafe(InstancesPost entity, Map<
var instances = ObjectConverterUtils.convertObject(entity, Instances.class);

new InstanceService(vertxContext, okapiHeaders)
.createInstances(instances.getInstances(), true, false)
.createInstances(instances.getInstances(), true, false, true)
.otherwise(cause -> respond500WithTextPlain(cause.getMessage()))
.onComplete(asyncResultHandler);
}
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/org/folio/services/BulkProcessingContext.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.services;

import org.apache.commons.lang3.StringUtils;
import org.folio.rest.jaxrs.model.BulkUpsertRequest;

/**
* Encapsulates the context for bulk processing of entities from external file.
Expand All @@ -17,13 +18,15 @@ public class BulkProcessingContext {
private final String errorsFilePath;
private final String errorEntitiesFileLocalPath;
private final String errorsFileLocalPath;
private final boolean publishEvents;

public BulkProcessingContext(String entitiesFilePath) {
this.initialFilePath = StringUtils.removeStart(entitiesFilePath, '/');
public BulkProcessingContext(BulkUpsertRequest request) {
this.initialFilePath = StringUtils.removeStart(request.getRecordsFileName(), '/');
this.errorEntitiesFilePath = initialFilePath + FAILED_ENTITIES_FILE_SUFFIX;
this.errorsFilePath = initialFilePath + ERRORS_FILE_SUFFIX;
this.errorEntitiesFileLocalPath = ROOT_FOLDER + errorEntitiesFilePath;
this.errorsFileLocalPath = ROOT_FOLDER + errorsFilePath;
this.publishEvents = request.getPublishEvents();
}

public String getErrorEntitiesFilePath() {
Expand All @@ -42,4 +45,7 @@ public String getErrorsFileLocalPath() {
return errorsFileLocalPath;
}

public boolean isPublishEvents() {
return publishEvents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ public final class BatchOperationContext<T> {
*/
private final Collection<T> existingRecords;

public BatchOperationContext(Collection<T> recordsToBeCreated, Collection<T> existingRecords) {
private final boolean publishEvents;

public BatchOperationContext(Collection<T> recordsToBeCreated, Collection<T> existingRecords, boolean publishEvents) {
this.recordsToBeCreated = unmodifiableCollection(recordsToBeCreated);
this.existingRecords = unmodifiableCollection(existingRecords);
this.publishEvents = publishEvents;
}

public Collection<T> getRecordsToBeCreated() {
Expand All @@ -23,4 +26,8 @@ public Collection<T> getRecordsToBeCreated() {
public Collection<T> getExistingRecords() {
return existingRecords;
}

public boolean isPublishEvents() {
return publishEvents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
public final class BatchOperationContextFactory {
private BatchOperationContextFactory() { }

public static <T> Future<BatchOperationContext<T>> buildBatchOperationContext(
boolean upsert, List<T> all, AbstractRepository<T> repository, Function<T, String> idGetter) {
public static <T> Future<BatchOperationContext<T>> buildBatchOperationContext(boolean upsert, List<T> all,
AbstractRepository<T> repository,
Function<T, String> idGetter,
boolean publishEvents) {

if (!upsert) {
return succeededFuture(new BatchOperationContext<>(all, emptyList()));
return succeededFuture(new BatchOperationContext<>(all, emptyList(), publishEvents));
}

return repository.getById(all, idGetter).map(found -> {
final var toBeCreated = all.stream()
.filter(entity -> !found.containsKey(idGetter.apply(entity)))
.collect(toList());

return new BatchOperationContext<>(toBeCreated, found.values());
return new BatchOperationContext<>(toBeCreated, found.values(), publishEvents);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ private Future<List<T>> loadEntities(BulkUpsertRequest bulkRequest) {
}

private Future<BulkUpsertResponse> upsertEntities(List<T> entities, BulkUpsertRequest bulkRequest) {
BulkProcessingContext bulkProcessingContext = new BulkProcessingContext(bulkRequest.getRecordsFileName());
BulkProcessingContext bulkProcessingContext = new BulkProcessingContext(bulkRequest);

return upsert(entities)
return upsert(entities, bulkProcessingContext.isPublishEvents())
.map(v -> new BulkUpsertResponse().withErrorsNumber(0))
.recover(e -> processSequentially(entities, bulkProcessingContext));
}
Expand All @@ -89,7 +89,7 @@ private Future<BulkUpsertResponse> processSequentially(List<T> entities, BulkPro
BulkProcessingErrorFileWriter errorsWriter = new BulkProcessingErrorFileWriter(vertx, bulkContext);

return errorsWriter.initialize()
.compose(v -> processInBatches(entities, entity -> upsert(List.of(entity))
.compose(v -> processInBatches(entities, entity -> upsert(List.of(entity), bulkContext.isPublishEvents())
.recover(e -> handleUpsertFailure(errorsCounter, errorsWriter, entity, e))))
.eventually(errorsWriter::close)
.eventually(() -> uploadErrorsFiles(bulkContext))
Expand Down Expand Up @@ -166,10 +166,11 @@ private Future<Void> uploadErrorsFiles(BulkProcessingContext bulkContext) {
* Performs an upsert operation on specified list of {@code entities}.
* The implementation of the upsert operation depends on the specifics of the {@code <T>} type of entity.
*
* @param entities - a list of entities to be updated or created
* @param entities - a list of entities to be updated or created
* @param publishEvents - a flag that indicates whether domain events should be published
* @return Future of Void, succeeded if the upsert operation is successful, otherwise failed
*/
protected abstract Future<Void> upsert(List<T> entities);
protected abstract Future<Void> upsert(List<T> entities, boolean publishEvents);

/**
* Provides a representation of the given {@code entity} to be written to error file containing entities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ protected Future<Void> ensureEntitiesWithNonMarcControlledFieldsData(List<Instan
}

@Override
protected Future<Void> upsert(List<InstanceWrapper> instanceWrappers) {
protected Future<Void> upsert(List<InstanceWrapper> instanceWrappers, boolean publishEvents) {
List<Instance> instances = instanceWrappers.stream().map(InstanceWrapper::instance).toList();

return instanceService.createInstances(instances, APPLY_UPSERT, APPLY_OPTIMISTIC_LOCKING)
return instanceService.createInstances(instances, APPLY_UPSERT, APPLY_OPTIMISTIC_LOCKING, publishEvents)
.compose(response -> {
if (!isCreateSuccessResponse(response)) {
String msg = String.format("Failed to update instances, status: '%s', message: '%s'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ public Handler<Response> publishCreatedOrUpdated(BatchOperationContext<D> batchO
log.info("Records created {}, records updated {}", batchOperation.getRecordsToBeCreated().size(),
batchOperation.getExistingRecords().size());

publishRecordsCreated(batchOperation.getRecordsToBeCreated()).compose(
notUsed -> publishUpdated(batchOperation.getExistingRecords()));
if (batchOperation.isPublishEvents()) {
publishRecordsCreated(batchOperation.getRecordsToBeCreated()).compose(
notUsed -> publishUpdated(batchOperation.getExistingRecords()));
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public Future<Response> createHoldings(List<HoldingsRecord> holdings, boolean up
.compose(ar -> hridManager.populateHridForHoldings(holdings)
.compose(NotesValidators::refuseHoldingLongNotes)
.compose(result -> buildBatchOperationContext(upsert, holdings,
holdingsRepository, HoldingsRecord::getId))
holdingsRepository, HoldingsRecord::getId, true))
.compose(batchOperation -> postSync(HOLDINGS_RECORD_TABLE, holdings, MAX_ENTITIES,
upsert, optimisticLocking, okapiHeaders, vertxContext, PostHoldingsStorageBatchSynchronousResponse.class)
.onSuccess(domainEventPublisher.publishCreatedOrUpdated(batchOperation))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,15 @@ public Future<Response> createInstance(Instance entity) {
.map(ResponseHandlerUtil::handleHridError);
}

public Future<Response> createInstances(List<Instance> instances, boolean upsert, boolean optimisticLocking) {
public Future<Response> createInstances(List<Instance> instances, boolean upsert, boolean optimisticLocking,
boolean publishEvents) {
final String statusUpdatedDate = generateStatusUpdatedDate();
instances.forEach(instance -> instance.setStatusUpdatedDate(statusUpdatedDate));

return hridManager.populateHridForInstances(instances)
.compose(NotesValidators::refuseInstanceLongNotes)
.compose(notUsed -> buildBatchOperationContext(upsert, instances,
instanceRepository, Instance::getId))
instanceRepository, Instance::getId, publishEvents))
.compose(batchOperation ->
// Can use instances list here directly because the class is stateful
postSync(INSTANCE_TABLE, instances, MAX_ENTITIES, upsert, optimisticLocking, okapiHeaders,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/folio/services/item/ItemService.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Future<Response> createItems(List<Item> items, boolean upsert, boolean op
.compose(NotesValidators::refuseItemLongNotes)
.compose(result -> effectiveValuesService.populateEffectiveValues(items))
.compose(this::populateCirculationNoteId)
.compose(result -> buildBatchOperationContext(upsert, items, itemRepository, Item::getId))
.compose(result -> buildBatchOperationContext(upsert, items, itemRepository, Item::getId, true))
.compose(batchOperation -> postSync(ITEM_TABLE, items, MAX_ENTITIES, upsert, optimisticLocking,
okapiHeaders, vertxContext, PostItemStorageBatchSynchronousResponse.class)
.onSuccess(domainEventService.publishCreatedOrUpdated(batchOperation)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.Assert.assertEquals;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3;

import io.vertx.core.json.Json;
Expand Down Expand Up @@ -104,58 +104,28 @@ public static void setUpClass() {
s3Client.createBucketIfNotExists();
}

@AfterClass
public static void tearDownClass() {
localStackContainer.close();
}

@Before
public void setUp() {
StorageTestSuite.deleteAll(TENANT_ID, PRECEDING_SUCCEEDING_TITLE_TABLE);
clearData();
removeAllEvents();
}

@AfterClass
public static void tearDownClass() {
localStackContainer.close();
}

@Test
public void shouldUpdateInstancesWithoutErrors()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
// given
List<String> instancesIds = extractInstancesIdsFromFile(BULK_INSTANCES_PATH);
FileInputStream inputStream = FileUtils.openInputStream(new File(BULK_INSTANCES_PATH));
String bulkFilePath = s3Client.write(BULK_INSTANCES_PATH, inputStream);

final IndividualResource existingInstance1 = createInstance(buildInstance(instancesIds.get(0), INSTANCE_TITLE_1));
final IndividualResource existingInstance2 = createInstance(buildInstance(instancesIds.get(1), INSTANCE_TITLE_2));

PrecedingSucceedingTitle precedingSucceedingTitle1 = new PrecedingSucceedingTitle(
existingInstance2.getId().toString(), null, "Houston oil directory", null, null);
precedingSucceedingTitleClient.create(precedingSucceedingTitle1.getJson());
PrecedingSucceedingTitle precedingSucceedingTitle2 = new PrecedingSucceedingTitle(
existingInstance2.getId().toString(), null, "International trade statistics", null, null);
precedingSucceedingTitleClient.create(precedingSucceedingTitle2.getJson());

// when
BulkUpsertResponse bulkResponse = postInstancesBulk(new BulkUpsertRequest().withRecordsFileName(bulkFilePath));

// then
assertThat(bulkResponse.getErrorsNumber(), is(0));
assertThat(bulkResponse.getErrorRecordsFileName(), nullValue());
assertThat(bulkResponse.getErrorsFileName(), nullValue());

JsonObject updatedInstance1 = getInstanceById(existingInstance1.getId().toString());
JsonObject updatedInstance2 = getInstanceById(existingInstance2.getId().toString());
assertNotControlledByMarcFields(existingInstance1.getJson(), updatedInstance1);
assertNotControlledByMarcFields(existingInstance2.getJson(), updatedInstance2);

List<JsonObject> updatedTitles = getPrecedingSucceedingTitlesByInstanceId(existingInstance2.getId());
updatedTitles.forEach(titleJson -> {
assertThat(titleJson.getString("succeedingInstanceId"), equalTo(existingInstance2.getId().toString()));
assertThat(titleJson.getString("precedingInstanceId"), nullValue());
assertThat(titleJson.getString("title"), notNullValue());
});
shouldUpdateInstances(true);
}

instanceMessageChecks.updatedMessagePublished(existingInstance1.getJson(), updatedInstance1);
instanceMessageChecks.updatedMessagePublished(existingInstance2.getJson(), updatedInstance2);
@Test
public void shouldUpdateInstancesWithoutErrorsAndDoNotPublishDomainEvents()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
shouldUpdateInstances(false);
}

@Test
Expand All @@ -173,7 +143,9 @@ public void shouldUpdateInstancesWithErrors()
final IndividualResource existingInstance2 = createInstance(buildInstance(instancesIds.get(1), INSTANCE_TITLE_2));

// when
BulkUpsertResponse bulkResponse = postInstancesBulk(new BulkUpsertRequest().withRecordsFileName(bulkFilePath));
BulkUpsertResponse bulkResponse = postInstancesBulk(new BulkUpsertRequest()
.withRecordsFileName(bulkFilePath)
);

// then
assertThat(bulkResponse.getErrorsNumber(), is(1));
Expand All @@ -187,6 +159,7 @@ public void shouldUpdateInstancesWithErrors()
assertThat(errors.size(), is(1));

JsonObject updatedInstance1 = getInstanceById(existingInstance1.getId().toString());

instanceMessageChecks.updatedMessagePublished(existingInstance1.getJson(), updatedInstance1);
instanceMessageChecks.noUpdatedMessagePublished(existingInstance2.getId().toString());
}
Expand All @@ -199,6 +172,55 @@ public void shouldReturnUnprocessableEntityIfRecordsFileNameIsNotSpecified()
assertThat(response.getStatusCode(), is(HTTP_UNPROCESSABLE_ENTITY.toInt()));
}

private void shouldUpdateInstances(boolean publishEvents)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
// given
List<String> instancesIds = extractInstancesIdsFromFile(BULK_INSTANCES_PATH);
FileInputStream inputStream = FileUtils.openInputStream(new File(BULK_INSTANCES_PATH));
String bulkFilePath = s3Client.write(BULK_INSTANCES_PATH, inputStream);

final IndividualResource existingInstance1 = createInstance(buildInstance(instancesIds.get(0), INSTANCE_TITLE_1));
final IndividualResource existingInstance2 = createInstance(buildInstance(instancesIds.get(1), INSTANCE_TITLE_2));

PrecedingSucceedingTitle precedingSucceedingTitle1 = new PrecedingSucceedingTitle(
existingInstance2.getId().toString(), null, "Houston oil directory", null, null);
precedingSucceedingTitleClient.create(precedingSucceedingTitle1.getJson());
PrecedingSucceedingTitle precedingSucceedingTitle2 = new PrecedingSucceedingTitle(
existingInstance2.getId().toString(), null, "International trade statistics", null, null);
precedingSucceedingTitleClient.create(precedingSucceedingTitle2.getJson());

// when
BulkUpsertResponse bulkResponse = postInstancesBulk(new BulkUpsertRequest()
.withRecordsFileName(bulkFilePath)
.withPublishEvents(publishEvents)
);

// then
assertThat(bulkResponse.getErrorsNumber(), is(0));
assertThat(bulkResponse.getErrorRecordsFileName(), nullValue());
assertThat(bulkResponse.getErrorsFileName(), nullValue());

JsonObject updatedInstance1 = getInstanceById(existingInstance1.getId().toString());
JsonObject updatedInstance2 = getInstanceById(existingInstance2.getId().toString());
assertNotControlledByMarcFields(existingInstance1.getJson(), updatedInstance1);
assertNotControlledByMarcFields(existingInstance2.getJson(), updatedInstance2);

List<JsonObject> updatedTitles = getPrecedingSucceedingTitlesByInstanceId(existingInstance2.getId());
updatedTitles.forEach(titleJson -> {
assertThat(titleJson.getString("succeedingInstanceId"), equalTo(existingInstance2.getId().toString()));
assertThat(titleJson.getString("precedingInstanceId"), nullValue());
assertThat(titleJson.getString("title"), notNullValue());
});

if (publishEvents) {
instanceMessageChecks.updatedMessagePublished(existingInstance1.getJson(), updatedInstance1);
instanceMessageChecks.updatedMessagePublished(existingInstance2.getJson(), updatedInstance2);
} else {
instanceMessageChecks.noUpdatedMessagePublished(existingInstance1.getId().toString());
instanceMessageChecks.noUpdatedMessagePublished(existingInstance2.getId().toString());
}
}

private List<String> extractInstancesIdsFromFile(String bulkInstancesFilePath) throws IOException {
return Files.readAllLines(Path.of(bulkInstancesFilePath))
.stream()
Expand Down
Loading
Loading