Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CIRCSTORE-521] Publish request batch event when requests are reordered #482

Merged
merged 11 commits into from
Aug 29, 2024
22 changes: 22 additions & 0 deletions ramls/request-queue-reordering.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Requests batch update",
"description": "List of ids reordered requests",
"type": "object",
"properties": {
"instanceId": {
"description": "Instance ID of reordered requests",
"type": "string",
"$ref": "raml-util/schemas/uuid.schema"
},
"requestIds": {
"description": "Array of requests ids",
"type": "array",
"items": {
"type": "string",
"$ref": "raml-util/schemas/uuid.schema"
}
}
},
"additionalProperties": false
}
1 change: 1 addition & 0 deletions ramls/request-storage-batch.raml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ documentation:
types:
requests-batch: !include requests-batch.json
errors: !include raml-util/schemas/errors.schema
request-queue-reordering: !include request-queue-reordering.json

traits:
validate: !include raml-util/traits/validation.raml
Expand Down
25 changes: 8 additions & 17 deletions src/main/java/org/folio/rest/impl/RequestsBatchAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@
import static org.folio.rest.jaxrs.resource.RequestStorageBatch.PostRequestStorageBatchRequestsResponse.respond201;
import static org.folio.rest.jaxrs.resource.RequestStorageBatch.PostRequestStorageBatchRequestsResponse.respond422WithApplicationJson;
import static org.folio.rest.jaxrs.resource.RequestStorageBatch.PostRequestStorageBatchRequestsResponse.respond500WithTextPlain;
import static org.folio.rest.tools.utils.TenantTool.tenantId;

import java.util.Map;

import javax.ws.rs.core.Response;
import org.folio.rest.annotations.Validate;
import org.folio.rest.jaxrs.model.RequestsBatch;
import org.folio.rest.jaxrs.resource.RequestStorageBatch;
import org.folio.rest.persist.PgUtil;
import org.folio.rest.tools.utils.MetadataUtil;
import org.folio.service.BatchResourceService;
import org.folio.service.request.RequestBatchResourceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,7 +23,7 @@
import io.vertx.core.Handler;

public class RequestsBatchAPI implements RequestStorageBatch {
private static final Logger LOG = LoggerFactory.getLogger(RequestsBatchAPI.class);
private static final Logger log = LoggerFactory.getLogger(RequestsBatchAPI.class);

@Validate
@Override
Expand All @@ -38,36 +35,30 @@ public void postRequestStorageBatchRequests(
MetadataUtil.populateMetadata(entity.getRequests(), okapiHeaders);
} catch (Throwable e) {
String msg = "Cannot populate metadata of request list elements: " + e.getMessage();
LOG.error(msg, e);
log.error(msg, e);
asyncResultHandler.handle(succeededFuture(respond500WithTextPlain(msg)));
return;
}

BatchResourceService batchUpdateService = new BatchResourceService(
PgUtil.postgresClient(context, okapiHeaders)
);

RequestBatchResourceService requestBatchUpdateService =
new RequestBatchResourceService(tenantId(okapiHeaders), batchUpdateService);

requestBatchUpdateService.executeRequestBatchUpdate(entity.getRequests(),
updateResult -> {
log.info("postRequestStorageBatchRequests:: requests: {}", entity.getRequests());
new RequestBatchResourceService(context, okapiHeaders)
.executeRequestBatchUpdate(entity.getRequests(), updateResult -> {
// Successfully updated
if (updateResult.succeeded()) {
LOG.debug("Batch update executed successfully");
log.debug("Batch update executed successfully");
asyncResultHandler.handle(succeededFuture(respond201()));
return;
}

// Update failed due to can not have more then one request in the same position
if (hasSamePositionConstraintViolated(updateResult.cause())) {
LOG.warn("Same position constraint violated", updateResult.cause());
log.warn("Same position constraint violated", updateResult.cause());
asyncResultHandler.handle(succeededFuture(
respond422WithApplicationJson(samePositionInQueueError(null, null))
));
} else {
// Other failure occurred
LOG.warn("Unhandled error occurred during update", updateResult.cause());
log.warn("Unhandled error occurred during update", updateResult.cause());
asyncResultHandler.handle(succeededFuture(
respond500WithTextPlain(updateResult.cause().getMessage())
));
Expand Down
40 changes: 27 additions & 13 deletions src/main/java/org/folio/service/BatchResourceService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.vertx.sqlclient.RowSet;

public class BatchResourceService {
private static final Logger LOG = LoggerFactory.getLogger(BatchResourceService.class);
private static final Logger log = LoggerFactory.getLogger(BatchResourceService.class);
private static final String WHERE_CLAUSE = "WHERE id = '%s'";

private final PostgresClient postgresClient;
Expand All @@ -38,14 +38,17 @@ public BatchResourceService(PostgresClient postgresClient) {
* @param batchFactories - Factory to create a batch update chunk.
* @param onFinishHandler - Callback.
*/
public void executeBatchUpdate(
public Future<Void> executeBatchUpdate(
List<Function<SQLConnection, Future<RowSet<Row>>>> batchFactories,
Handler<AsyncResult<Void>> onFinishHandler) {

Promise<Void> promise = Promise.promise();

postgresClient.startTx(connectionResult -> {
if (connectionResult.failed()) {
LOG.warn("Can not start transaction", connectionResult.cause());
log.warn("Cannot start transaction", connectionResult.cause());
onFinishHandler.handle(failedFuture(connectionResult.cause()));
promise.fail(connectionResult.cause());
return;
}

Expand All @@ -60,21 +63,32 @@ public void executeBatchUpdate(
// Handle overall update result and decide on whether to commit or rollback transaction
lastUpdate.onComplete(updateResult -> {
if (updateResult.failed()) {
LOG.warn("Batch update rejected", updateResult.cause());
log.warn("Batch update rejected", updateResult.cause());

// Rollback transaction and keep original cause.
postgresClient.rollbackTx(connectionResult,
rollback -> onFinishHandler.handle(failedFuture(updateResult.cause()))
);
postgresClient.rollbackTx(connectionResult, rollback -> {
onFinishHandler.handle(failedFuture(updateResult.cause()));
promise.fail(updateResult.cause());
});
} else {
LOG.debug("Update successful, committing transaction");

postgresClient.endTx(connectionResult, onFinishHandler);
log.debug("Update successful, committing transaction");

postgresClient.endTx(connectionResult, commitResult -> {
if (commitResult.succeeded()) {
onFinishHandler.handle(succeededFuture());
promise.complete();
} else {
log.warn("Failed to commit transaction", commitResult.cause());
onFinishHandler.handle(failedFuture(commitResult.cause()));
promise.fail(commitResult.cause());
}
});
}
});
});
}

return promise.future();
}
/**
* Creates update single entity batch function.
*
Expand All @@ -92,7 +106,7 @@ public <T> Function<SQLConnection, Future<RowSet<Row>>> updateSingleEntityBatchF
final Promise<RowSet<Row>> promise = promise();
final Future<SQLConnection> connectionResult = succeededFuture(connection);

LOG.debug("Updating entity {} with id {}", entity, id);
log.debug("Updating entity {} with id {}", entity, id);

postgresClient.update(connectionResult, tableName, entity, "jsonb",
String.format(WHERE_CLAUSE, id), false, promise);
Expand All @@ -113,7 +127,7 @@ public Function<SQLConnection, Future<RowSet<Row>>> queryWithParamsBatchFactory(
String query, Collection<?> params) {

return connection -> {
LOG.debug("Executing SQL [{}], got [{}] parameters", query, params.size());
log.debug("Executing SQL [{}], got [{}] parameters", query, params.size());

final Promise<RowSet<Row>> promise = promise();
final Future<SQLConnection> connectionResult = succeededFuture(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.CIRCULATION_SETTINGS;
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.LOAN;
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.REQUEST;
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.REQUEST_QUEUE_REORDERING;
import static org.folio.support.kafka.topic.CirculationStorageKafkaTopic.RULES;

import java.util.Map;
Expand All @@ -19,6 +20,7 @@
import org.folio.rest.jaxrs.model.CirculationSetting;
import org.folio.rest.jaxrs.model.Loan;
import org.folio.rest.jaxrs.model.Request;
import org.folio.rest.jaxrs.model.RequestQueueReordering;

import io.vertx.core.Context;
import lombok.extern.log4j.Log4j2;
Expand Down Expand Up @@ -53,6 +55,15 @@ public static EntityChangedEventPublisher<String, Request> requestEventPublisher
new RequestRepository(vertxContext, okapiHeaders));
}

public static EntityChangedEventPublisher<String, RequestQueueReordering>
requestBatchEventPublisher(Context vertxContext, Map<String, String> okapiHeaders) {

return new EntityChangedEventPublisher<>(okapiHeaders, RequestQueueReordering::getInstanceId,
NULL_ID, new EntityChangedEventFactory<>(), new DomainEventPublisher<>(vertxContext,
REQUEST_QUEUE_REORDERING.fullTopicName(tenantId(okapiHeaders)),
FailureHandler.noOperation()), null);
}

public static EntityChangedEventPublisher<String, CheckIn> checkInEventPublisher(
Context vertxContext, Map<String, String> okapiHeaders) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,28 @@

import static java.util.stream.IntStream.rangeClosed;
import static org.folio.rest.persist.PostgresClient.convertToPsqlStandard;
import static org.folio.rest.tools.utils.TenantTool.tenantId;
import static org.folio.service.event.EntityChangedEventPublisherFactory.requestBatchEventPublisher;
import static org.folio.support.ModuleConstants.REQUEST_TABLE;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.folio.rest.jaxrs.model.Request;
import org.folio.rest.jaxrs.model.RequestQueueReordering;
import org.folio.rest.persist.PgUtil;
import org.folio.rest.persist.SQLConnection;
import org.folio.service.BatchResourceService;
import org.folio.service.event.EntityChangedEventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.sqlclient.Row;
Expand All @@ -29,12 +36,13 @@ public class RequestBatchResourceService {

private final BatchResourceService batchResourceService;
private final String tenantName;
private final EntityChangedEventPublisher<String, RequestQueueReordering> eventPublisher;

public RequestBatchResourceService(String tenantName,
BatchResourceService batchResourceService) {

this.batchResourceService = batchResourceService;
this.tenantName = tenantName;
public RequestBatchResourceService(Context context, Map<String, String> okapiHeaders) {
this.batchResourceService = new BatchResourceService(PgUtil.postgresClient(context,
okapiHeaders));
this.tenantName = tenantId(okapiHeaders);
this.eventPublisher = requestBatchEventPublisher(context, okapiHeaders);
}

/**
Expand All @@ -59,8 +67,8 @@ public RequestBatchResourceService(String tenantName,
* @param requests - List of requests to execute in batch.
* @param onFinishHandler - Callback function.
*/
public void executeRequestBatchUpdate(
List<Request> requests, Handler<AsyncResult<Void>> onFinishHandler) {
public void executeRequestBatchUpdate(List<Request> requests,
Handler<AsyncResult<Void>> onFinishHandler) {

LOG.debug("Removing positions for all request to go through positions constraint");
List<Function<SQLConnection, Future<RowSet<Row>>>> allDatabaseOperations =
Expand All @@ -80,7 +88,21 @@ public void executeRequestBatchUpdate(
LOG.info("Executing batch update, total records to update [{}] (including remove positions)",
allDatabaseOperations.size());

batchResourceService.executeBatchUpdate(allDatabaseOperations, onFinishHandler);
RequestQueueReordering payload = mapRequestsToPayload(requests);
LOG.info("executeRequestBatchUpdate:: instanceId: {}, requests: {}",
payload.getInstanceId(), payload.getRequestIds());

batchResourceService.executeBatchUpdate(allDatabaseOperations, onFinishHandler)
.compose(v -> eventPublisher.publishCreated(payload.getInstanceId(), payload));
}

private RequestQueueReordering mapRequestsToPayload(List<Request> requests) {

return new RequestQueueReordering()
.withRequestIds(requests.stream()
.map(Request::getId)
.toList())
.withInstanceId(requests.get(0).getInstanceId());
}

private Function<SQLConnection, Future<RowSet<Row>>> removePositionsForRequestsBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

public enum CirculationStorageKafkaTopic implements KafkaTopic {
REQUEST("request", 10),
REQUEST_QUEUE_REORDERING("request-queue-reordering", 10),
CIRCULATION_SETTINGS("circulation-settings", 10),
LOAN("loan", 10),
CHECK_IN("check-in", 10),
Expand Down
Loading