Skip to content

Commit

Permalink
MODOAIPMH-420 - Save UUIDs of failed instances (#263)
Browse files Browse the repository at this point in the history
* MODOAIPMH-420 - Statistics API
  • Loading branch information
khandramai committed May 30, 2022
1 parent c58d780 commit 4132811
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 48 deletions.
24 changes: 11 additions & 13 deletions src/main/java/org/folio/oaipmh/dao/impl/InstancesDaoImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.folio.rest.jooq.tables.records.RequestMetadataLbRecord;
import org.folio.rest.jooq.tables.records.SkippedInstancesIdsRecord;
import org.folio.rest.jooq.tables.records.SuppressedFromDiscoveryInstancesIdsRecord;
import org.jooq.InsertValuesStep2;
import org.jooq.InsertValuesStep3;
import org.jooq.Record;
import org.springframework.stereotype.Repository;
Expand Down Expand Up @@ -206,33 +205,33 @@ public Future<RequestMetadataLb> updateRequestUpdatedDateAndStatistics(String re
});

var saveFailedToSaveInstancesIds = holder.getFailedToSaveInstancesIds().isEmpty() ? Future.succeededFuture() : queryExecutor.execute(dslContext -> {
InsertValuesStep2<FailedToSaveInstancesIdsRecord, UUID, UUID> insertValues = dslContext.insertInto(FAILED_TO_SAVE_INSTANCES_IDS, FAILED_TO_SAVE_INSTANCES_IDS.REQUEST_ID,
InsertValuesStep3<FailedToSaveInstancesIdsRecord, UUID, UUID, UUID> insertValues = dslContext.insertInto(FAILED_TO_SAVE_INSTANCES_IDS, FAILED_TO_SAVE_INSTANCES_IDS.ID, FAILED_TO_SAVE_INSTANCES_IDS.REQUEST_ID,
FAILED_TO_SAVE_INSTANCES_IDS.INSTANCE_ID);
holder.getFailedToSaveInstancesIds().forEach(id -> insertValues.values(UUID.fromString(requestId), UUID.fromString(id)));
holder.getFailedToSaveInstancesIds().forEach(id -> insertValues.values(UUID.randomUUID(), UUID.fromString(requestId), UUID.fromString(id)));
return insertValues;
})
.map(rows -> null);

var saveFailedInstancesIds = holder.getFailedInstancesIds().isEmpty() ? Future.succeededFuture() : queryExecutor.execute(dslContext -> {
InsertValuesStep2<FailedInstancesIdsRecord, UUID, UUID> insertValues = dslContext.insertInto(FAILED_INSTANCES_IDS, FAILED_INSTANCES_IDS.REQUEST_ID,
InsertValuesStep3<FailedInstancesIdsRecord, UUID, UUID, UUID> insertValues = dslContext.insertInto(FAILED_INSTANCES_IDS, FAILED_INSTANCES_IDS.ID, FAILED_INSTANCES_IDS.REQUEST_ID,
FAILED_INSTANCES_IDS.INSTANCE_ID);
holder.getFailedInstancesIds().forEach(id -> insertValues.values(UUID.fromString(requestId), UUID.fromString(id)));
holder.getFailedInstancesIds().forEach(id -> insertValues.values(UUID.randomUUID(), UUID.fromString(requestId), UUID.fromString(id)));
return insertValues;
})
.map(rows -> null);

var saveSkippedInstancesIds = holder.getSkippedInstancesIds().isEmpty() ? Future.succeededFuture() : queryExecutor.execute(dslContext -> {
InsertValuesStep2<SkippedInstancesIdsRecord, UUID, UUID> insertValues = dslContext.insertInto(SKIPPED_INSTANCES_IDS, SKIPPED_INSTANCES_IDS.REQUEST_ID,
InsertValuesStep3<SkippedInstancesIdsRecord, UUID, UUID, UUID> insertValues = dslContext.insertInto(SKIPPED_INSTANCES_IDS, SKIPPED_INSTANCES_IDS.ID, SKIPPED_INSTANCES_IDS.REQUEST_ID,
SKIPPED_INSTANCES_IDS.INSTANCE_ID);
holder.getSkippedInstancesIds().forEach(id -> insertValues.values(UUID.fromString(requestId), UUID.fromString(id)));
holder.getSkippedInstancesIds().forEach(id -> insertValues.values(UUID.randomUUID(), UUID.fromString(requestId), UUID.fromString(id)));
return insertValues;
})
.map(rows -> null);

var saveSuppressedFromDiscoveryInstancesIds = holder.getSuppressedInstancesIds().isEmpty() ? Future.succeededFuture() : queryExecutor.execute(dslContext -> {
InsertValuesStep2<SuppressedFromDiscoveryInstancesIdsRecord, UUID, UUID> insertValues = dslContext.insertInto(SUPPRESSED_FROM_DISCOVERY_INSTANCES_IDS, SUPPRESSED_FROM_DISCOVERY_INSTANCES_IDS.REQUEST_ID,
InsertValuesStep3<SuppressedFromDiscoveryInstancesIdsRecord, UUID, UUID, UUID> insertValues = dslContext.insertInto(SUPPRESSED_FROM_DISCOVERY_INSTANCES_IDS, SUPPRESSED_FROM_DISCOVERY_INSTANCES_IDS.ID, SUPPRESSED_FROM_DISCOVERY_INSTANCES_IDS.REQUEST_ID,
SUPPRESSED_FROM_DISCOVERY_INSTANCES_IDS.INSTANCE_ID);
holder.getSuppressedInstancesIds().forEach(id -> insertValues.values(UUID.fromString(requestId), UUID.fromString(id)));
holder.getSuppressedInstancesIds().forEach(id -> insertValues.values(UUID.randomUUID(), UUID.fromString(requestId), UUID.fromString(id)));
return insertValues;
})
.map(rows -> null);
Expand Down Expand Up @@ -399,7 +398,7 @@ private RequestMetadataCollection queryResultToRequestMetadataCollection(QueryRe

private UuidCollection queryResultToUuidCollection(QueryResult queryResult, int totalRecordsCount) {
List<String> list = queryResult.stream()
.map(row -> rowToUUID(row.unwrap()))
.map(row -> rowToUUID(row.unwrap()).toString())
.collect(toList());
return new UuidCollection().withUuidCollection(list)
.withTotalRecords(totalRecordsCount);
Expand All @@ -424,9 +423,8 @@ private RequestMetadata rowToRequestMetadata(Row row) {
return requestMetadata;
}

private String rowToUUID(Row row) {
var pojo = RowMappers.getFailedInstancesIdsMapper().apply(row);
return pojo.getInstanceId().toString();
private UUID rowToUUID(Row row) {
return (UUID) row.getValue("instance_id");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ public class MarcWithHoldingsRequestHelper extends AbstractHelper {
private final WorkerExecutor saveInstancesExecutor;
private final Context downloadContext;

private MetricsCollectingService metricsCollectingService = MetricsCollectingService.getInstance();
private final AtomicInteger batchesSizeCounter = new AtomicInteger();

private final MetricsCollectingService metricsCollectingService = MetricsCollectingService.getInstance();
private InstancesService instancesService;

public static MarcWithHoldingsRequestHelper getInstance() {
Expand Down Expand Up @@ -202,36 +204,38 @@ public Future<Response> handle(Request request, Context vertxContext) {
}


private void updateRequestStreamEnded(String requestId, String tenantId, StatisticsHolder holder) {
private void updateRequestStreamEnded(String requestId, String tenantId, StatisticsHolder downloadInstancesStatistics) {
Promise<Void> promise = Promise.promise();

PostgresClient.getInstance(downloadContext.owner(), tenantId).withTrans(connection -> {
Tuple params = Tuple.of(true, UUID.fromString(requestId), holder.getDownloadedAndSavedInstancesCounter(), holder.getFailedToSaveInstancesCounter());
Tuple params = Tuple.of(true, UUID.fromString(requestId), downloadInstancesStatistics.getDownloadedAndSavedInstancesCounter(), downloadInstancesStatistics.getFailedToSaveInstancesCounter());
String updateRequestMetadataSql = "UPDATE " + PostgresClient.convertToPsqlStandard(tenantId)
+ ".request_metadata_lb SET stream_ended = $1, downloaded_and_saved_instances_counter = $3, failed_to_save_instances_counter = $4 WHERE request_id = $2";

List<Tuple> batch = new ArrayList<>();
holder.getFailedToSaveInstancesIds().forEach(instanceId -> batch.add(Tuple.of(UUID.fromString(requestId), UUID.fromString(instanceId))));
downloadInstancesStatistics.getFailedToSaveInstancesIds()
.forEach(instanceId -> batch.add(Tuple.of(UUID.fromString(requestId), UUID.fromString(instanceId))));
String sql = "INSERT INTO " + PostgresClient.convertToPsqlStandard(tenantId)
+ ".failed_to_save_instances_ids (request_id, instance_id) VALUES ($1, $2)";

connection.execute(updateRequestMetadataSql, params)
.compose(x -> connection.execute(sql, batch))
.onComplete(result -> {
connection.getPgConnection().close();
if (result.failed()) {
promise.fail(result.cause());

var error = result.cause();
logger.error("Error updating request metadata on instances stream completion.", error);
promise.fail(error);
} else {
logger.info("Updating request metadata on instances stream completion finished.");
promise.complete();
}
});
return Future.succeededFuture();
});
}

private Future<Void> processBatch(Request request, Context context, Promise<Response> oaiPmhResponsePromise, String requestId,
private void processBatch(Request request, Context context, Promise<Response> oaiPmhResponsePromise, String requestId,
boolean firstBatch, StatisticsHolder statistics, OffsetDateTime lastUpdateDate) {
Promise<Void> promise = Promise.promise();
try {
boolean deletedRecordSupport = RepositoryConfigurationUtil.isDeletedRecordsEnabled(request.getRequestId());
int batchSize = Integer
Expand All @@ -249,6 +253,7 @@ private Future<Void> processBatch(Request request, Context context, Promise<Resp
List<JsonObject> instances = fut.result();
logger.debug("Processing instances: {}.", instances.size());
if (CollectionUtils.isEmpty(instances) && !firstBatch) {
logger.error("Error: Instances collection is empty for non-first batch.");
handleException(oaiPmhResponsePromise, new IllegalArgumentException("Specified resumption token doesn't exists."));
return;
}
Expand All @@ -262,9 +267,8 @@ private Future<Void> processBatch(Request request, Context context, Promise<Resp

if (CollectionUtils.isEmpty(instances)) {
logger.debug("Got empty instances.");
buildRecordsResponse(request, requestId, instances, new HashMap<>(), firstBatch, null, deletedRecordSupport, statistics)
buildRecordsResponse(request, requestId, instances, lastUpdateDate, new HashMap<>(), firstBatch, null, deletedRecordSupport, statistics)
.onSuccess(oaiPmhResponsePromise::complete)
.onComplete(x -> instancesService.updateRequestUpdatedDateAndStatistics(requestId, lastUpdateDate, statistics, request.getTenant()))
.onFailure(e -> handleException(oaiPmhResponsePromise, e));
return;
}
Expand All @@ -279,18 +283,15 @@ private Future<Void> processBatch(Request request, Context context, Promise<Resp
.parseInt(RepositoryConfigurationUtil.getProperty(request.getRequestId(), REPOSITORY_SRS_HTTP_REQUEST_RETRY_ATTEMPTS));

requestSRSByIdentifiers(srsClient, context.owner(), instancesWithoutLast, deletedRecordSupport, retryAttempts)
.onSuccess(res -> buildRecordsResponse(request, requestId, instancesWithoutLast, res, firstBatch, nextInstanceId,
.onSuccess(res -> buildRecordsResponse(request, requestId, instancesWithoutLast, lastUpdateDate, res, firstBatch, nextInstanceId,
deletedRecordSupport, statistics)
.onSuccess(oaiPmhResponsePromise::complete)
.onComplete(x -> instancesService.updateRequestUpdatedDateAndStatistics(requestId, lastUpdateDate, statistics, request.getTenant()))
.onSuccess(p -> promise.complete())
.onFailure(e -> handleException(oaiPmhResponsePromise, e)))
.onFailure(e -> handleException(oaiPmhResponsePromise, e));
});
} catch (Exception e) {
handleException(oaiPmhResponsePromise, e);
}
return promise.future();
}

private SourceStorageSourceRecordsClientWrapper createAndSetupSrsClient(Request request) {
Expand All @@ -316,19 +317,43 @@ private void setupBatchHttpStream(Promise<?> promise, HttpRequestImpl<Buffer> in
var batch = new ArrayList<JsonEvent>();
jsonParser.handler(event -> {
batch.add(event);
if (batch.size() >= DATABASE_FETCHING_CHUNK_SIZE) {
var size = batch.size();
if (size >= DATABASE_FETCHING_CHUNK_SIZE) {
jsonParser.pause();
saveInstancesIds(new ArrayList<>(batch), tenant, requestId, postgresClient).onComplete(result -> {
completeBatchAndUpdateDownloadStatistics(downloadInstancesStatistics, batch, result);
if (result.succeeded()) {
downloadInstancesStatistics.addDownloadedAndSavedInstancesCounter(size);
} else {
downloadInstancesStatistics.addFailedToSaveInstancesCounter(size);
var ids = batch.stream()
.map(instance -> instance.objectValue().getString(INSTANCE_ID_FIELD_NAME)).collect(toList());
downloadInstancesStatistics.addFailedToSaveInstancesIds(ids);
}
batch.clear();
jsonParser.resume();
});
}
});
jsonParser.endHandler(e -> {
if (!batch.isEmpty()) {
var size = batch.size();
saveInstancesIds(new ArrayList<>(batch), tenant, requestId, postgresClient)
.onComplete(result -> completeBatchAndUpdateDownloadStatistics(downloadInstancesStatistics, batch, result)).onComplete(vVoid -> downloadInstancesPromise.complete());
.onComplete(result -> {
if (result.succeeded()) {
downloadInstancesStatistics.addDownloadedAndSavedInstancesCounter(size);
} else {
downloadInstancesStatistics.addFailedToSaveInstancesCounter(size);
var ids = batch.stream()
.map(instance -> instance.objectValue().getString(INSTANCE_ID_FIELD_NAME)).collect(toList());
downloadInstancesStatistics.addFailedToSaveInstancesIds(ids);
}
batch.clear();
}).onComplete(vVoid -> {
logger.info("Completing batch processing for requestId: {}. Last batch size was: {}.", requestId, size);
downloadInstancesPromise.complete();
});
} else {
logger.info("Completing batch processing for requestId: {}. Last batch was empty.", requestId);
downloadInstancesPromise.complete();
}
});
Expand Down Expand Up @@ -371,15 +396,6 @@ private void setupBatchHttpStream(Promise<?> promise, HttpRequestImpl<Buffer> in
});
}

private void completeBatchAndUpdateDownloadStatistics(StatisticsHolder downloadInstancesStatistics, ArrayList<JsonEvent> batch, AsyncResult<Void> result) {
if (result.succeeded()) {
downloadInstancesStatistics.getDownloadedAndSavedInstancesCounter().addAndGet(batch.size());
} else {
downloadInstancesStatistics.getFailedToSaveInstancesCounter().addAndGet(batch.size());
}
batch.clear();
}

private HttpRequest<Buffer> buildInventoryQuery(Request request) {
Map<String, String> paramMap = new HashMap<>();
Date date = convertStringToDate(request.getFrom(), false, false);
Expand Down Expand Up @@ -623,10 +639,12 @@ private void adjustItems(JsonObject instance) {
}
}

private Future<Response> buildRecordsResponse(Request request, String requestId, List<JsonObject> batch,
private Future<Response> buildRecordsResponse(Request request, String requestId, List<JsonObject> batch, OffsetDateTime lastUpdateDate,
Map<String, JsonObject> srsResponse, boolean firstBatch, String nextInstanceId, boolean deletedRecordSupport, StatisticsHolder statistics) {

Promise<Response> promise = Promise.promise();
// Set incoming instances number
batchesSizeCounter.addAndGet(batch.size());
try {
List<RecordType> records = buildRecordsList(request, batch, srsResponse, deletedRecordSupport, statistics);
logger.debug("Build records response, instances = {}, instances with srs records = {}.", batch.size(), records.size());
Expand All @@ -649,10 +667,11 @@ private Future<Response> buildRecordsResponse(Request request, String requestId,
} else {
response = responseHelper.buildFailureResponse(oaipmh, request);
}

promise.complete(response);
instancesService.updateRequestUpdatedDateAndStatistics(requestId, lastUpdateDate, statistics, request.getTenant())
.onComplete(x -> promise.complete(response));
} catch (Exception e) {
handleException(promise, e);
instancesService.updateRequestUpdatedDateAndStatistics(requestId, lastUpdateDate, statistics, request.getTenant())
.onComplete(x -> handleException(promise, e));
}
return promise.future();
}
Expand Down Expand Up @@ -721,6 +740,7 @@ private ResumptionTokenType buildResumptionTokenFromRequest(Request request, Str
String nextInstanceId) {
long cursor = request.getOffset();
if (nextInstanceId == null) {
logHarvestingCompletion();
return new ResumptionTokenType()
.withValue("")
.withCursor(BigInteger.valueOf(cursor));
Expand All @@ -746,6 +766,11 @@ private ResumptionTokenType buildResumptionTokenFromRequest(Request request, Str
.withCursor(BigInteger.valueOf(cursor));
}

private void logHarvestingCompletion() {
logger.info("Harvesting completed. Number of processed instances: {}.", batchesSizeCounter.get());
batchesSizeCounter.setRelease(0);
}

private RecordType createRecord(Request request, JsonObject instance, String instanceId) {
String identifierPrefix = request.getIdentifierPrefix();
return new RecordType().withHeader(createHeader(instance, request).withIdentifier(getIdentifier(identifierPrefix, instanceId)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,45 @@
<changeSet id="2022-05-18--16-00-add-extended-statistic-tables" author="Viachaslau Khandramai">
<renameColumn newColumnName="suppressed_instances_counter" oldColumnName="supressed_instances_counter" tableName="request_metadata_lb"/>
<createTable tableName="failed_to_save_instances_ids">
<column name="request_id" type="uuid">
<column name="id" type="uuid">
<constraints primaryKey="true" nullable="false"/>
</column>
<column name="request_id" type="uuid">
<constraints nullable="false"/>
</column>
<column name="instance_id" type="uuid">
<constraints nullable="false"/>
</column>
</createTable>
<createTable tableName="failed_instances_ids">
<column name="request_id" type="uuid">
<column name="id" type="uuid">
<constraints primaryKey="true" nullable="false"/>
</column>
<column name="request_id" type="uuid">
<constraints nullable="false"/>
</column>
<column name="instance_id" type="uuid">
<constraints nullable="false"/>
</column>
</createTable>
<createTable tableName="skipped_instances_ids">
<column name="request_id" type="uuid">
<column name="id" type="uuid">
<constraints primaryKey="true" nullable="false"/>
</column>
<column name="request_id" type="uuid">
<constraints nullable="false"/>
</column>
<column name="instance_id" type="uuid">
<constraints nullable="false"/>
</column>
</createTable>
<createTable tableName="suppressed_from_discovery_instances_ids">
<column name="request_id" type="uuid">
<column name="id" type="uuid">
<constraints primaryKey="true" nullable="false"/>
</column>
<column name="request_id" type="uuid">
<constraints nullable="false"/>
</column>
<column name="instance_id" type="uuid">
<constraints nullable="false"/>
</column>
Expand Down

0 comments on commit 4132811

Please sign in to comment.