diff --git a/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/InvoiceEventsDaoImpl.java b/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/InvoiceEventsDaoImpl.java index e8c73f6..3801f20 100644 --- a/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/InvoiceEventsDaoImpl.java +++ b/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/InvoiceEventsDaoImpl.java @@ -51,14 +51,14 @@ public InvoiceEventsDaoImpl(PostgresClientFactory pgClientFactory) { } @Override - public Future> save(InvoiceAuditEvent invoiceAuditEvent, String tenantId) { - LOGGER.debug("save:: Saving Invoice AuditEvent with tenant id : {}", tenantId); + public Future> save(InvoiceAuditEvent event, String tenantId) { + LOGGER.debug("save:: Saving Invoice AuditEvent with invoice id: {}", event.getInvoiceId()); String logTable = formatDBTableName(tenantId, TABLE_NAME); String query = format(INSERT_SQL, logTable); - return makeSaveCall(query, invoiceAuditEvent, tenantId) + return makeSaveCall(query, event, tenantId) .onSuccess(rows -> LOGGER.info("save:: Saved Invoice AuditEvent with tenant id : {}", tenantId)) .onFailure(e -> LOGGER.error("Failed to save record with id: {} for invoice id: {} in to table {}", - invoiceAuditEvent.getId(), invoiceAuditEvent.getInvoiceId(), TABLE_NAME, e)); + event.getId(), event.getInvoiceId(), TABLE_NAME, e)); } @Override diff --git a/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/InvoiceLineEventsDaoImpl.java b/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/InvoiceLineEventsDaoImpl.java index bf7fa67..7cfc1ad 100644 --- a/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/InvoiceLineEventsDaoImpl.java +++ b/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/InvoiceLineEventsDaoImpl.java @@ -53,14 +53,14 @@ public InvoiceLineEventsDaoImpl(PostgresClientFactory pgClientFactory) { } @Override - public Future> save(InvoiceLineAuditEvent invoiceLineAuditEvent, String tenantId) { - LOGGER.debug("save:: Saving InvoiceLine AuditEvent with tenant id : {}", tenantId); + public Future> save(InvoiceLineAuditEvent event, String tenantId) { + LOGGER.debug("save:: Saving InvoiceLine AuditEvent with invoice line id: {}", event.getInvoiceLineId()); String logTable = formatDBTableName(tenantId, TABLE_NAME); String query = format(INSERT_SQL, logTable); - return makeSaveCall(query, invoiceLineAuditEvent, tenantId) + return makeSaveCall(query, event, tenantId) .onSuccess(rows -> LOGGER.info("save:: Saved InvoiceLine AuditEvent with tenant id : {}", tenantId)) .onFailure(e -> LOGGER.error("Failed to save record with id: {} for invoice line id: {} in to table {}", - invoiceLineAuditEvent.getId(), invoiceLineAuditEvent.getInvoiceLineId(), TABLE_NAME, e)); + event.getId(), event.getInvoiceLineId(), TABLE_NAME, e)); } @Override diff --git a/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/OrderEventsDaoImpl.java b/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/OrderEventsDaoImpl.java index 384a9aa..cab0a0e 100644 --- a/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/OrderEventsDaoImpl.java +++ b/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/OrderEventsDaoImpl.java @@ -52,13 +52,12 @@ public OrderEventsDaoImpl(PostgresClientFactory pgClientFactory) { } @Override - public Future> save(OrderAuditEvent orderAuditEvent, String tenantId) { - LOGGER.debug("save:: Saving Order AuditEvent with tenant id : {}", tenantId); + public Future> save(OrderAuditEvent event, String tenantId) { + LOGGER.debug("save:: Saving Order AuditEvent with order id: {}", event.getOrderId()); Promise> promise = Promise.promise(); - LOGGER.debug("formatDBTableName:: Formatting DB Table Name with tenant id : {}", tenantId); String logTable = formatDBTableName(tenantId, TABLE_NAME); String query = format(INSERT_SQL, logTable); - makeSaveCall(promise, query, orderAuditEvent, tenantId); + makeSaveCall(promise, query, event, tenantId); LOGGER.info("save:: Saved Order AuditEvent with tenant id : {}", tenantId); return promise.future(); } @@ -125,5 +124,4 @@ private OrderAuditEvent mapRowToOrderEvent(Row row) { .withActionDate(Date.from(row.getLocalDateTime(ACTION_DATE_FIELD).toInstant(ZoneOffset.UTC))) .withOrderSnapshot(JsonObject.mapFrom(row.getValue(MODIFIED_CONTENT_FIELD))); } - } diff --git a/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/OrderLineEventsDaoImpl.java b/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/OrderLineEventsDaoImpl.java index 44a088a..bfe2321 100644 --- a/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/OrderLineEventsDaoImpl.java +++ b/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/OrderLineEventsDaoImpl.java @@ -54,13 +54,12 @@ public OrderLineEventsDaoImpl(PostgresClientFactory pgClientFactory) { } @Override - public Future> save(OrderLineAuditEvent orderLineAuditEvent, String tenantId) { - LOGGER.debug("save:: Saving OrderLine AuditEvent with tenant id : {}", tenantId); + public Future> save(OrderLineAuditEvent event, String tenantId) { + LOGGER.debug("save:: Saving OrderLine AuditEvent with order line id : {}", event.getOrderLineId()); Promise> promise = Promise.promise(); - LOGGER.debug("formatDBTableName:: Formatting DB Table Name with tenant id : {}", tenantId); String logTable = formatDBTableName(tenantId, TABLE_NAME); String query = format(INSERT_SQL, logTable); - makeSaveCall(promise, query, orderLineAuditEvent, tenantId); + makeSaveCall(promise, query, event, tenantId); LOGGER.info("save:: Saved OrderLine AuditEvent with tenant id : {}", tenantId); return promise.future(); } diff --git a/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/OrganizationEventsDaoImpl.java b/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/OrganizationEventsDaoImpl.java index 1c17390..4a0e668 100644 --- a/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/OrganizationEventsDaoImpl.java +++ b/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/OrganizationEventsDaoImpl.java @@ -51,14 +51,14 @@ public OrganizationEventsDaoImpl(PostgresClientFactory pgClientFactory) { } @Override - public Future> save(OrganizationAuditEvent organizationAuditEvent, String tenantId) { - LOGGER.debug("save:: Saving Organization AuditEvent with tenant id : {}", tenantId); + public Future> save(OrganizationAuditEvent event, String tenantId) { + LOGGER.debug("save:: Saving Organization AuditEvent with organization id : {}", event.getOrganizationId()); String logTable = formatDBTableName(tenantId, TABLE_NAME); String query = format(INSERT_SQL, logTable); - return makeSaveCall(query, organizationAuditEvent, tenantId) + return makeSaveCall(query, event, tenantId) .onSuccess(rows -> LOGGER.info("save:: Saved Organization AuditEvent with tenant id : {}", tenantId)) .onFailure(e -> LOGGER.error("Failed to save record with id: {} for organization id: {} in to table {}", - organizationAuditEvent.getId(), organizationAuditEvent.getOrganizationId(), TABLE_NAME, e)); + event.getId(), event.getOrganizationId(), TABLE_NAME, e)); } @Override diff --git a/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/PieceEventsDaoImpl.java b/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/PieceEventsDaoImpl.java index 85395ae..c5117de 100644 --- a/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/PieceEventsDaoImpl.java +++ b/mod-audit-server/src/main/java/org/folio/dao/acquisition/impl/PieceEventsDaoImpl.java @@ -66,16 +66,13 @@ public PieceEventsDaoImpl(PostgresClientFactory pgClientFactory) { } @Override - public Future> save(PieceAuditEvent pieceAuditEvent, String tenantId) { - LOGGER.debug("save:: Trying to save Piece AuditEvent with tenant id : {}", tenantId); + public Future> save(PieceAuditEvent event, String tenantId) { + LOGGER.debug("save:: Trying to save Piece AuditEvent with piece id : {}", event.getPieceId()); Promise> promise = Promise.promise(); - - LOGGER.debug("formatDBTableName:: Formatting DB Table Name with tenant id : {}", tenantId); String logTable = formatDBTableName(tenantId, TABLE_NAME); String query = format(INSERT_SQL, logTable); - - makeSaveCall(promise, query, pieceAuditEvent, tenantId); - LOGGER.info("save:: Saved Piece AuditEvent for pieceId={} in tenant id={}", pieceAuditEvent.getPieceId(), tenantId); + makeSaveCall(promise, query, event, tenantId); + LOGGER.info("save:: Saved Piece AuditEvent for pieceId={} in tenant id={}", event.getPieceId(), tenantId); return promise.future(); } @@ -158,5 +155,4 @@ private void makeSaveCall(Promise> promise, String query, PieceAudit promise.fail(e); } } - } diff --git a/mod-audit-server/src/main/java/org/folio/util/ErrorUtils.java b/mod-audit-server/src/main/java/org/folio/util/ErrorUtils.java index 4d7b8e7..df37cca 100644 --- a/mod-audit-server/src/main/java/org/folio/util/ErrorUtils.java +++ b/mod-audit-server/src/main/java/org/folio/util/ErrorUtils.java @@ -12,6 +12,7 @@ import java.util.Collections; public class ErrorUtils { + private ErrorUtils(){ } @@ -26,7 +27,7 @@ public static Errors buildErrors(String statusCode, Throwable throwable) { } public static Future handleFailures(Throwable throwable, String id) { - return (throwable instanceof PgException pgException && pgException.getCode().equals(UNIQUE_CONSTRAINT_VIOLATION_CODE)) ? + return (throwable instanceof PgException pgException && pgException.getSqlState().equals(UNIQUE_CONSTRAINT_VIOLATION_CODE)) ? Future.failedFuture(new DuplicateEventException(String.format("Event with id=%s is already processed.", id))) : Future.failedFuture(throwable); } diff --git a/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/InvoiceEventsHandler.java b/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/InvoiceEventsHandler.java index 145aca9..a204e3d 100644 --- a/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/InvoiceEventsHandler.java +++ b/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/InvoiceEventsHandler.java @@ -1,6 +1,7 @@ package org.folio.verticle.acquisition.consumers; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; @@ -30,20 +31,25 @@ public InvoiceEventsHandler(Vertx vertx, @Override public Future handle(KafkaConsumerRecord kafkaConsumerRecord) { + var result = Promise.promise(); var kafkaHeaders = kafkaConsumerRecord.headers(); var okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx); var event = new JsonObject(kafkaConsumerRecord.value()).mapTo(InvoiceAuditEvent.class); LOGGER.info("handle:: Starting processing of Invoice audit event with id: {} for invoice id: {}", event.getId(), event.getInvoiceId()); - - return invoiceAuditEventsService.saveInvoiceAuditEvent(event, okapiConnectionParams.getTenantId()) - .onSuccess(ar -> LOGGER.info("handle:: Invoice audit event with id: {} has been processed for invoice id: {}", event.getId(), event.getInvoiceId())) + invoiceAuditEventsService.saveInvoiceAuditEvent(event, okapiConnectionParams.getTenantId()) + .onSuccess(ar -> { + LOGGER.info("handle:: Invoice audit event with id: {} has been processed for invoice id: {}", event.getId(), event.getInvoiceId()); + result.complete(event.getId()); + }) .onFailure(e -> { if (e instanceof DuplicateEventException) { LOGGER.info("handle:: Duplicate Invoice audit event with id: {} for invoice id: {} received, skipped processing", event.getId(), event.getInvoiceId()); + result.complete(event.getId()); } else { LOGGER.error("Processing of Invoice audit event with id: {} for invoice id: {} has been failed", event.getId(), event.getInvoiceId(), e); + result.fail(e); } - }) - .map(event.getId()); + }); + return result.future(); } } diff --git a/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/InvoiceLineEventsHandler.java b/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/InvoiceLineEventsHandler.java index cab6f33..75ce87e 100644 --- a/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/InvoiceLineEventsHandler.java +++ b/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/InvoiceLineEventsHandler.java @@ -1,6 +1,7 @@ package org.folio.verticle.acquisition.consumers; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; @@ -30,20 +31,26 @@ public InvoiceLineEventsHandler(Vertx vertx, @Override public Future handle(KafkaConsumerRecord kafkaConsumerRecord) { + var result = Promise.promise(); var kafkaHeaders = kafkaConsumerRecord.headers(); var okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx); var event = new JsonObject(kafkaConsumerRecord.value()).mapTo(InvoiceLineAuditEvent.class); LOGGER.info("handle:: Starting processing of Invoice Line audit event with id: {} for invoice line id: {}", event.getId(), event.getInvoiceLineId()); - - return invoiceLineAuditEventsService.saveInvoiceLineAuditEvent(event, okapiConnectionParams.getTenantId()) - .onSuccess(ar -> LOGGER.info("handle:: Invoice Line audit event with id: {} has been processed for invoice line id: {}", event.getId(), event.getInvoiceLineId())) + invoiceLineAuditEventsService.saveInvoiceLineAuditEvent(event, okapiConnectionParams.getTenantId()) + .onSuccess(ar -> { + LOGGER.info("handle:: Invoice Line audit event with id: {} has been processed for invoice line id: {}", + event.getId(), event.getInvoiceLineId()); + result.complete(event.getId()); + }) .onFailure(e -> { if (e instanceof DuplicateEventException) { LOGGER.info("handle:: Duplicate Invoice Line audit event with id: {} for invoice line id: {} received, skipped processing", event.getId(), event.getInvoiceLineId()); + result.complete(event.getId()); } else { LOGGER.error("Processing of Invoice Line audit event with id: {} for invoice line id: {} has been failed", event.getId(), event.getInvoiceLineId(), e); + result.fail(e); } - }) - .map(event.getId()); + }); + return result.future(); } } diff --git a/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/OrderEventsHandler.java b/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/OrderEventsHandler.java index 208a91b..b299cae 100644 --- a/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/OrderEventsHandler.java +++ b/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/OrderEventsHandler.java @@ -1,6 +1,7 @@ package org.folio.verticle.acquisition.consumers; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; @@ -30,20 +31,25 @@ public OrderEventsHandler(Vertx vertx, @Override public Future handle(KafkaConsumerRecord kafkaConsumerRecord) { + var result = Promise.promise(); var kafkaHeaders = kafkaConsumerRecord.headers(); var okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx); var event = new JsonObject(kafkaConsumerRecord.value()).mapTo(OrderAuditEvent.class); LOGGER.info("handle:: Starting processing of Order audit event with id: {} for order id: {}", event.getId(), event.getOrderId()); - - return orderAuditEventsService.saveOrderAuditEvent(event, okapiConnectionParams.getTenantId()) - .onSuccess(ar -> LOGGER.info("handle:: Order audit event with id: {} has been processed for order id: {}", event.getId(), event.getOrderId())) + orderAuditEventsService.saveOrderAuditEvent(event, okapiConnectionParams.getTenantId()) + .onSuccess(ar -> { + LOGGER.info("handle:: Order audit event with id: {} has been processed for order id: {}", event.getId(), event.getOrderId()); + result.complete(event.getId()); + }) .onFailure(e -> { if (e instanceof DuplicateEventException) { LOGGER.info("handle:: Duplicate Order audit event with id: {} for order id: {} received, skipped processing", event.getId(), event.getOrderId()); + result.complete(event.getId()); } else { LOGGER.error("Processing of Order audit event with id: {} for order id: {} has been failed", event.getId(), event.getOrderId(), e); + result.fail(e); } - }) - .map(event.getId()); + }); + return result.future(); } } diff --git a/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/OrderLineEventsHandler.java b/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/OrderLineEventsHandler.java index 28d98b2..c557af7 100644 --- a/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/OrderLineEventsHandler.java +++ b/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/OrderLineEventsHandler.java @@ -1,6 +1,7 @@ package org.folio.verticle.acquisition.consumers; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; @@ -30,20 +31,29 @@ public OrderLineEventsHandler(Vertx vertx, @Override public Future handle(KafkaConsumerRecord kafkaConsumerRecord) { + var result = Promise.promise(); var kafkaHeaders = kafkaConsumerRecord.headers(); var okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx); var event = new JsonObject(kafkaConsumerRecord.value()).mapTo(OrderLineAuditEvent.class); - LOGGER.info("handle:: Starting processing of Order Line audit event with id: {} for order id: {} and order line id: {}", event.getId(), event.getOrderId(), event.getOrderLineId()); - - return orderLineAuditEventsService.saveOrderLineAuditEvent(event, okapiConnectionParams.getTenantId()) - .onSuccess(ar -> LOGGER.info("handle:: Order Line audit event with id: {} has been processed for order id: {} and order line id: {}", event.getId(), event.getOrderId(), event.getOrderLineId())) + LOGGER.info("handle:: Starting processing of Order Line audit event with id: {} for order id: {} and order line id: {}", + event.getId(), event.getOrderId(), event.getOrderLineId()); + orderLineAuditEventsService.saveOrderLineAuditEvent(event, okapiConnectionParams.getTenantId()) + .onSuccess(ar -> { + LOGGER.info("handle:: Order Line audit event with id: {} has been processed for order id: {} and order line id: {}", + event.getId(), event.getOrderId(), event.getOrderLineId()); + result.complete(event.getId()); + }) .onFailure(e -> { if (e instanceof DuplicateEventException) { - LOGGER.info("handle:: Duplicate Order Line audit event with id: {} for order id: {} and order line id: {} received, skipped processing", event.getId(), event.getOrderId(), event.getOrderLineId()); + LOGGER.info("handle:: Duplicate Order Line audit event with id: {} for order id: {} and order line id: {} received, skipped processing", + event.getId(), event.getOrderId(), event.getOrderLineId()); + result.complete(event.getId()); } else { - LOGGER.error("Processing of Order Line audit event with id: {} for order id: {} and order line id: {} has been failed", event.getId(), event.getOrderId(), event.getOrderLineId(), e); + LOGGER.error("Processing of Order Line audit event with id: {} for order id: {} and order line id: {} has been failed", + event.getId(), event.getOrderId(), event.getOrderLineId(), e); + result.fail(e); } - }) - .map(event.getId()); + }); + return result.future(); } } diff --git a/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/OrganizationEventsHandler.java b/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/OrganizationEventsHandler.java index 807ef87..94d1b0b 100644 --- a/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/OrganizationEventsHandler.java +++ b/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/OrganizationEventsHandler.java @@ -1,6 +1,7 @@ package org.folio.verticle.acquisition.consumers; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; @@ -30,20 +31,25 @@ public OrganizationEventsHandler(Vertx vertx, @Override public Future handle(KafkaConsumerRecord kafkaConsumerRecord) { + var result = Promise.promise(); var kafkaHeaders = kafkaConsumerRecord.headers(); var okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx); var event = new JsonObject(kafkaConsumerRecord.value()).mapTo(OrganizationAuditEvent.class); LOGGER.info("handle:: Starting processing of Organization audit event with id: {} for organization id: {}", event.getId(), event.getOrganizationId()); - - return organizationAuditEventsService.saveOrganizationAuditEvent(event, okapiConnectionParams.getTenantId()) - .onSuccess(ar -> LOGGER.info("handle:: Organization audit event with id: {} has been processed for organization id: {}", event.getId(), event.getOrganizationId())) + organizationAuditEventsService.saveOrganizationAuditEvent(event, okapiConnectionParams.getTenantId()) + .onSuccess(ar -> { + LOGGER.info("handle:: Organization audit event with id: {} has been processed for organization id: {}", event.getId(), event.getOrganizationId()); + result.complete(event.getId()); + }) .onFailure(e -> { if (e instanceof DuplicateEventException) { LOGGER.info("handle:: Duplicate Organization audit event with id: {} for organization id: {} received, skipped processing", event.getId(), event.getOrganizationId()); + result.complete(event.getId()); } else { LOGGER.error("Processing of Organization audit event with id: {} for organization id: {} has been failed", event.getId(), event.getOrganizationId(), e); + result.fail(e); } - }) - .map(event.getId()); + }); + return result.future(); } } diff --git a/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/PieceEventsHandler.java b/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/PieceEventsHandler.java index 3f08f3b..942e274 100644 --- a/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/PieceEventsHandler.java +++ b/mod-audit-server/src/main/java/org/folio/verticle/acquisition/consumers/PieceEventsHandler.java @@ -1,6 +1,7 @@ package org.folio.verticle.acquisition.consumers; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; @@ -30,20 +31,25 @@ public PieceEventsHandler(Vertx vertx, @Override public Future handle(KafkaConsumerRecord kafkaConsumerRecord) { + var result = Promise.promise(); var kafkaHeaders = kafkaConsumerRecord.headers(); var okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx); var event = new JsonObject(kafkaConsumerRecord.value()).mapTo(PieceAuditEvent.class); LOGGER.info("handle:: Starting processing of Piece audit event with id: {} for piece id: {}", event.getId(), event.getPieceId()); - - return pieceAuditEventsService.savePieceAuditEvent(event, okapiConnectionParams.getTenantId()) - .onSuccess(ar -> LOGGER.info("handle:: Piece audit event with id: {} has been processed for piece id: {}", event.getId(), event.getPieceId())) + pieceAuditEventsService.savePieceAuditEvent(event, okapiConnectionParams.getTenantId()) + .onSuccess(ar -> { + LOGGER.info("handle:: Piece audit event with id: {} has been processed for piece id: {}", event.getId(), event.getPieceId()); + result.complete(event.getId()); + }) .onFailure(e -> { if (e instanceof DuplicateEventException) { LOGGER.info("handle:: Duplicate Piece audit event with id: {} for piece id: {} received, skipped processing", event.getId(), event.getPieceId()); + result.complete(event.getId()); } else { LOGGER.error("Processing of Piece audit event with id: {} for piece id: {} has been failed", event.getId(), event.getPieceId(), e); + result.fail(e); } - }) - .map(event.getId()); + }); + return result.future(); } }