From 4132811d4438fa50f8578e2ed607eff03704182a Mon Sep 17 00:00:00 2001 From: Viachaslau Khandramai Date: Mon, 30 May 2022 10:59:05 +0300 Subject: [PATCH] MODOAIPMH-420 - Save UUIDs of failed instances (#263) * MODOAIPMH-420 - Statistics API --- .../oaipmh/dao/impl/InstancesDaoImpl.java | 24 +++-- .../MarcWithHoldingsRequestHelper.java | 87 ++++++++++++------- ...--16-00-add-extended-statistics-tables.xml | 20 ++++- 3 files changed, 83 insertions(+), 48 deletions(-) diff --git a/src/main/java/org/folio/oaipmh/dao/impl/InstancesDaoImpl.java b/src/main/java/org/folio/oaipmh/dao/impl/InstancesDaoImpl.java index adffe57f..49363129 100644 --- a/src/main/java/org/folio/oaipmh/dao/impl/InstancesDaoImpl.java +++ b/src/main/java/org/folio/oaipmh/dao/impl/InstancesDaoImpl.java @@ -42,7 +42,6 @@ import org.folio.rest.jooq.tables.records.RequestMetadataLbRecord; import org.folio.rest.jooq.tables.records.SkippedInstancesIdsRecord; import org.folio.rest.jooq.tables.records.SuppressedFromDiscoveryInstancesIdsRecord; -import org.jooq.InsertValuesStep2; import org.jooq.InsertValuesStep3; import org.jooq.Record; import org.springframework.stereotype.Repository; @@ -206,33 +205,33 @@ public Future updateRequestUpdatedDateAndStatistics(String re }); var saveFailedToSaveInstancesIds = holder.getFailedToSaveInstancesIds().isEmpty() ? Future.succeededFuture() : queryExecutor.execute(dslContext -> { - InsertValuesStep2 insertValues = dslContext.insertInto(FAILED_TO_SAVE_INSTANCES_IDS, FAILED_TO_SAVE_INSTANCES_IDS.REQUEST_ID, + InsertValuesStep3 insertValues = dslContext.insertInto(FAILED_TO_SAVE_INSTANCES_IDS, FAILED_TO_SAVE_INSTANCES_IDS.ID, FAILED_TO_SAVE_INSTANCES_IDS.REQUEST_ID, FAILED_TO_SAVE_INSTANCES_IDS.INSTANCE_ID); - holder.getFailedToSaveInstancesIds().forEach(id -> insertValues.values(UUID.fromString(requestId), UUID.fromString(id))); + holder.getFailedToSaveInstancesIds().forEach(id -> insertValues.values(UUID.randomUUID(), UUID.fromString(requestId), UUID.fromString(id))); return insertValues; }) .map(rows -> null); var saveFailedInstancesIds = holder.getFailedInstancesIds().isEmpty() ? Future.succeededFuture() : queryExecutor.execute(dslContext -> { - InsertValuesStep2 insertValues = dslContext.insertInto(FAILED_INSTANCES_IDS, FAILED_INSTANCES_IDS.REQUEST_ID, + InsertValuesStep3 insertValues = dslContext.insertInto(FAILED_INSTANCES_IDS, FAILED_INSTANCES_IDS.ID, FAILED_INSTANCES_IDS.REQUEST_ID, FAILED_INSTANCES_IDS.INSTANCE_ID); - holder.getFailedInstancesIds().forEach(id -> insertValues.values(UUID.fromString(requestId), UUID.fromString(id))); + holder.getFailedInstancesIds().forEach(id -> insertValues.values(UUID.randomUUID(), UUID.fromString(requestId), UUID.fromString(id))); return insertValues; }) .map(rows -> null); var saveSkippedInstancesIds = holder.getSkippedInstancesIds().isEmpty() ? Future.succeededFuture() : queryExecutor.execute(dslContext -> { - InsertValuesStep2 insertValues = dslContext.insertInto(SKIPPED_INSTANCES_IDS, SKIPPED_INSTANCES_IDS.REQUEST_ID, + InsertValuesStep3 insertValues = dslContext.insertInto(SKIPPED_INSTANCES_IDS, SKIPPED_INSTANCES_IDS.ID, SKIPPED_INSTANCES_IDS.REQUEST_ID, SKIPPED_INSTANCES_IDS.INSTANCE_ID); - holder.getSkippedInstancesIds().forEach(id -> insertValues.values(UUID.fromString(requestId), UUID.fromString(id))); + holder.getSkippedInstancesIds().forEach(id -> insertValues.values(UUID.randomUUID(), UUID.fromString(requestId), UUID.fromString(id))); return insertValues; }) .map(rows -> null); var saveSuppressedFromDiscoveryInstancesIds = holder.getSuppressedInstancesIds().isEmpty() ? Future.succeededFuture() : queryExecutor.execute(dslContext -> { - InsertValuesStep2 insertValues = dslContext.insertInto(SUPPRESSED_FROM_DISCOVERY_INSTANCES_IDS, SUPPRESSED_FROM_DISCOVERY_INSTANCES_IDS.REQUEST_ID, + InsertValuesStep3 insertValues = dslContext.insertInto(SUPPRESSED_FROM_DISCOVERY_INSTANCES_IDS, SUPPRESSED_FROM_DISCOVERY_INSTANCES_IDS.ID, SUPPRESSED_FROM_DISCOVERY_INSTANCES_IDS.REQUEST_ID, SUPPRESSED_FROM_DISCOVERY_INSTANCES_IDS.INSTANCE_ID); - holder.getSuppressedInstancesIds().forEach(id -> insertValues.values(UUID.fromString(requestId), UUID.fromString(id))); + holder.getSuppressedInstancesIds().forEach(id -> insertValues.values(UUID.randomUUID(), UUID.fromString(requestId), UUID.fromString(id))); return insertValues; }) .map(rows -> null); @@ -399,7 +398,7 @@ private RequestMetadataCollection queryResultToRequestMetadataCollection(QueryRe private UuidCollection queryResultToUuidCollection(QueryResult queryResult, int totalRecordsCount) { List list = queryResult.stream() - .map(row -> rowToUUID(row.unwrap())) + .map(row -> rowToUUID(row.unwrap()).toString()) .collect(toList()); return new UuidCollection().withUuidCollection(list) .withTotalRecords(totalRecordsCount); @@ -424,9 +423,8 @@ private RequestMetadata rowToRequestMetadata(Row row) { return requestMetadata; } - private String rowToUUID(Row row) { - var pojo = RowMappers.getFailedInstancesIdsMapper().apply(row); - return pojo.getInstanceId().toString(); + private UUID rowToUUID(Row row) { + return (UUID) row.getValue("instance_id"); } } diff --git a/src/main/java/org/folio/oaipmh/processors/MarcWithHoldingsRequestHelper.java b/src/main/java/org/folio/oaipmh/processors/MarcWithHoldingsRequestHelper.java index fd42ec4a..adec884e 100644 --- a/src/main/java/org/folio/oaipmh/processors/MarcWithHoldingsRequestHelper.java +++ b/src/main/java/org/folio/oaipmh/processors/MarcWithHoldingsRequestHelper.java @@ -135,7 +135,9 @@ public class MarcWithHoldingsRequestHelper extends AbstractHelper { private final WorkerExecutor saveInstancesExecutor; private final Context downloadContext; - private MetricsCollectingService metricsCollectingService = MetricsCollectingService.getInstance(); + private final AtomicInteger batchesSizeCounter = new AtomicInteger(); + + private final MetricsCollectingService metricsCollectingService = MetricsCollectingService.getInstance(); private InstancesService instancesService; public static MarcWithHoldingsRequestHelper getInstance() { @@ -202,26 +204,29 @@ public Future handle(Request request, Context vertxContext) { } - private void updateRequestStreamEnded(String requestId, String tenantId, StatisticsHolder holder) { + private void updateRequestStreamEnded(String requestId, String tenantId, StatisticsHolder downloadInstancesStatistics) { Promise promise = Promise.promise(); - PostgresClient.getInstance(downloadContext.owner(), tenantId).withTrans(connection -> { - Tuple params = Tuple.of(true, UUID.fromString(requestId), holder.getDownloadedAndSavedInstancesCounter(), holder.getFailedToSaveInstancesCounter()); + Tuple params = Tuple.of(true, UUID.fromString(requestId), downloadInstancesStatistics.getDownloadedAndSavedInstancesCounter(), downloadInstancesStatistics.getFailedToSaveInstancesCounter()); String updateRequestMetadataSql = "UPDATE " + PostgresClient.convertToPsqlStandard(tenantId) + ".request_metadata_lb SET stream_ended = $1, downloaded_and_saved_instances_counter = $3, failed_to_save_instances_counter = $4 WHERE request_id = $2"; List batch = new ArrayList<>(); - holder.getFailedToSaveInstancesIds().forEach(instanceId -> batch.add(Tuple.of(UUID.fromString(requestId), UUID.fromString(instanceId)))); + downloadInstancesStatistics.getFailedToSaveInstancesIds() + .forEach(instanceId -> batch.add(Tuple.of(UUID.fromString(requestId), UUID.fromString(instanceId)))); String sql = "INSERT INTO " + PostgresClient.convertToPsqlStandard(tenantId) + ".failed_to_save_instances_ids (request_id, instance_id) VALUES ($1, $2)"; connection.execute(updateRequestMetadataSql, params) .compose(x -> connection.execute(sql, batch)) .onComplete(result -> { + connection.getPgConnection().close(); if (result.failed()) { - promise.fail(result.cause()); - + var error = result.cause(); + logger.error("Error updating request metadata on instances stream completion.", error); + promise.fail(error); } else { + logger.info("Updating request metadata on instances stream completion finished."); promise.complete(); } }); @@ -229,9 +234,8 @@ private void updateRequestStreamEnded(String requestId, String tenantId, Statist }); } - private Future processBatch(Request request, Context context, Promise oaiPmhResponsePromise, String requestId, + private void processBatch(Request request, Context context, Promise oaiPmhResponsePromise, String requestId, boolean firstBatch, StatisticsHolder statistics, OffsetDateTime lastUpdateDate) { - Promise promise = Promise.promise(); try { boolean deletedRecordSupport = RepositoryConfigurationUtil.isDeletedRecordsEnabled(request.getRequestId()); int batchSize = Integer @@ -249,6 +253,7 @@ private Future processBatch(Request request, Context context, Promise instances = fut.result(); logger.debug("Processing instances: {}.", instances.size()); if (CollectionUtils.isEmpty(instances) && !firstBatch) { + logger.error("Error: Instances collection is empty for non-first batch."); handleException(oaiPmhResponsePromise, new IllegalArgumentException("Specified resumption token doesn't exists.")); return; } @@ -262,9 +267,8 @@ private Future processBatch(Request request, Context context, Promise(), firstBatch, null, deletedRecordSupport, statistics) + buildRecordsResponse(request, requestId, instances, lastUpdateDate, new HashMap<>(), firstBatch, null, deletedRecordSupport, statistics) .onSuccess(oaiPmhResponsePromise::complete) - .onComplete(x -> instancesService.updateRequestUpdatedDateAndStatistics(requestId, lastUpdateDate, statistics, request.getTenant())) .onFailure(e -> handleException(oaiPmhResponsePromise, e)); return; } @@ -279,18 +283,15 @@ private Future processBatch(Request request, Context context, Promise buildRecordsResponse(request, requestId, instancesWithoutLast, res, firstBatch, nextInstanceId, + .onSuccess(res -> buildRecordsResponse(request, requestId, instancesWithoutLast, lastUpdateDate, res, firstBatch, nextInstanceId, deletedRecordSupport, statistics) .onSuccess(oaiPmhResponsePromise::complete) - .onComplete(x -> instancesService.updateRequestUpdatedDateAndStatistics(requestId, lastUpdateDate, statistics, request.getTenant())) - .onSuccess(p -> promise.complete()) .onFailure(e -> handleException(oaiPmhResponsePromise, e))) .onFailure(e -> handleException(oaiPmhResponsePromise, e)); }); } catch (Exception e) { handleException(oaiPmhResponsePromise, e); } - return promise.future(); } private SourceStorageSourceRecordsClientWrapper createAndSetupSrsClient(Request request) { @@ -316,19 +317,43 @@ private void setupBatchHttpStream(Promise promise, HttpRequestImpl in var batch = new ArrayList(); jsonParser.handler(event -> { batch.add(event); - if (batch.size() >= DATABASE_FETCHING_CHUNK_SIZE) { + var size = batch.size(); + if (size >= DATABASE_FETCHING_CHUNK_SIZE) { jsonParser.pause(); saveInstancesIds(new ArrayList<>(batch), tenant, requestId, postgresClient).onComplete(result -> { - completeBatchAndUpdateDownloadStatistics(downloadInstancesStatistics, batch, result); + if (result.succeeded()) { + downloadInstancesStatistics.addDownloadedAndSavedInstancesCounter(size); + } else { + downloadInstancesStatistics.addFailedToSaveInstancesCounter(size); + var ids = batch.stream() + .map(instance -> instance.objectValue().getString(INSTANCE_ID_FIELD_NAME)).collect(toList()); + downloadInstancesStatistics.addFailedToSaveInstancesIds(ids); + } + batch.clear(); jsonParser.resume(); }); } }); jsonParser.endHandler(e -> { if (!batch.isEmpty()) { + var size = batch.size(); saveInstancesIds(new ArrayList<>(batch), tenant, requestId, postgresClient) - .onComplete(result -> completeBatchAndUpdateDownloadStatistics(downloadInstancesStatistics, batch, result)).onComplete(vVoid -> downloadInstancesPromise.complete()); + .onComplete(result -> { + if (result.succeeded()) { + downloadInstancesStatistics.addDownloadedAndSavedInstancesCounter(size); + } else { + downloadInstancesStatistics.addFailedToSaveInstancesCounter(size); + var ids = batch.stream() + .map(instance -> instance.objectValue().getString(INSTANCE_ID_FIELD_NAME)).collect(toList()); + downloadInstancesStatistics.addFailedToSaveInstancesIds(ids); + } + batch.clear(); + }).onComplete(vVoid -> { + logger.info("Completing batch processing for requestId: {}. Last batch size was: {}.", requestId, size); + downloadInstancesPromise.complete(); + }); } else { + logger.info("Completing batch processing for requestId: {}. Last batch was empty.", requestId); downloadInstancesPromise.complete(); } }); @@ -371,15 +396,6 @@ private void setupBatchHttpStream(Promise promise, HttpRequestImpl in }); } - private void completeBatchAndUpdateDownloadStatistics(StatisticsHolder downloadInstancesStatistics, ArrayList batch, AsyncResult result) { - if (result.succeeded()) { - downloadInstancesStatistics.getDownloadedAndSavedInstancesCounter().addAndGet(batch.size()); - } else { - downloadInstancesStatistics.getFailedToSaveInstancesCounter().addAndGet(batch.size()); - } - batch.clear(); - } - private HttpRequest buildInventoryQuery(Request request) { Map paramMap = new HashMap<>(); Date date = convertStringToDate(request.getFrom(), false, false); @@ -623,10 +639,12 @@ private void adjustItems(JsonObject instance) { } } - private Future buildRecordsResponse(Request request, String requestId, List batch, + private Future buildRecordsResponse(Request request, String requestId, List batch, OffsetDateTime lastUpdateDate, Map srsResponse, boolean firstBatch, String nextInstanceId, boolean deletedRecordSupport, StatisticsHolder statistics) { Promise promise = Promise.promise(); + // Set incoming instances number + batchesSizeCounter.addAndGet(batch.size()); try { List records = buildRecordsList(request, batch, srsResponse, deletedRecordSupport, statistics); logger.debug("Build records response, instances = {}, instances with srs records = {}.", batch.size(), records.size()); @@ -649,10 +667,11 @@ private Future buildRecordsResponse(Request request, String requestId, } else { response = responseHelper.buildFailureResponse(oaipmh, request); } - - promise.complete(response); + instancesService.updateRequestUpdatedDateAndStatistics(requestId, lastUpdateDate, statistics, request.getTenant()) + .onComplete(x -> promise.complete(response)); } catch (Exception e) { - handleException(promise, e); + instancesService.updateRequestUpdatedDateAndStatistics(requestId, lastUpdateDate, statistics, request.getTenant()) + .onComplete(x -> handleException(promise, e)); } return promise.future(); } @@ -721,6 +740,7 @@ private ResumptionTokenType buildResumptionTokenFromRequest(Request request, Str String nextInstanceId) { long cursor = request.getOffset(); if (nextInstanceId == null) { + logHarvestingCompletion(); return new ResumptionTokenType() .withValue("") .withCursor(BigInteger.valueOf(cursor)); @@ -746,6 +766,11 @@ private ResumptionTokenType buildResumptionTokenFromRequest(Request request, Str .withCursor(BigInteger.valueOf(cursor)); } + private void logHarvestingCompletion() { + logger.info("Harvesting completed. Number of processed instances: {}.", batchesSizeCounter.get()); + batchesSizeCounter.setRelease(0); + } + private RecordType createRecord(Request request, JsonObject instance, String instanceId) { String identifierPrefix = request.getIdentifierPrefix(); return new RecordType().withHeader(createHeader(instance, request).withIdentifier(getIdentifier(identifierPrefix, instanceId))); diff --git a/src/main/resources/liquibase/tenant/scripts/2022-05-18--16-00-add-extended-statistics-tables.xml b/src/main/resources/liquibase/tenant/scripts/2022-05-18--16-00-add-extended-statistics-tables.xml index 3961a5cb..a7aac49c 100644 --- a/src/main/resources/liquibase/tenant/scripts/2022-05-18--16-00-add-extended-statistics-tables.xml +++ b/src/main/resources/liquibase/tenant/scripts/2022-05-18--16-00-add-extended-statistics-tables.xml @@ -7,33 +7,45 @@ - + + + + - + + + + - + + + + - + + + +