Skip to content

Commit

Permalink
[MODAUD-194]. Revert consumers to use promises, apply other recommend…
Browse files Browse the repository at this point in the history
…ations
  • Loading branch information
BKadirkhodjaev committed Nov 13, 2024
1 parent a16ec01 commit 6f2e51b
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ public InvoiceEventsDaoImpl(PostgresClientFactory pgClientFactory) {
}

@Override
public Future<RowSet<Row>> save(InvoiceAuditEvent invoiceAuditEvent, String tenantId) {
LOGGER.debug("save:: Saving Invoice AuditEvent with tenant id : {}", tenantId);
public Future<RowSet<Row>> save(InvoiceAuditEvent event, String tenantId) {
LOGGER.debug("save:: Saving Invoice AuditEvent with invoice id: {}", event.getInvoiceId());
String logTable = formatDBTableName(tenantId, TABLE_NAME);
String query = format(INSERT_SQL, logTable);
return makeSaveCall(query, invoiceAuditEvent, tenantId)
return makeSaveCall(query, event, tenantId)
.onSuccess(rows -> LOGGER.info("save:: Saved Invoice AuditEvent with tenant id : {}", tenantId))
.onFailure(e -> LOGGER.error("Failed to save record with id: {} for invoice id: {} in to table {}",
invoiceAuditEvent.getId(), invoiceAuditEvent.getInvoiceId(), TABLE_NAME, e));
event.getId(), event.getInvoiceId(), TABLE_NAME, e));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ public InvoiceLineEventsDaoImpl(PostgresClientFactory pgClientFactory) {
}

@Override
public Future<RowSet<Row>> save(InvoiceLineAuditEvent invoiceLineAuditEvent, String tenantId) {
LOGGER.debug("save:: Saving InvoiceLine AuditEvent with tenant id : {}", tenantId);
public Future<RowSet<Row>> save(InvoiceLineAuditEvent event, String tenantId) {
LOGGER.debug("save:: Saving InvoiceLine AuditEvent with invoice line id: {}", event.getInvoiceLineId());
String logTable = formatDBTableName(tenantId, TABLE_NAME);
String query = format(INSERT_SQL, logTable);
return makeSaveCall(query, invoiceLineAuditEvent, tenantId)
return makeSaveCall(query, event, tenantId)
.onSuccess(rows -> LOGGER.info("save:: Saved InvoiceLine AuditEvent with tenant id : {}", tenantId))
.onFailure(e -> LOGGER.error("Failed to save record with id: {} for invoice line id: {} in to table {}",
invoiceLineAuditEvent.getId(), invoiceLineAuditEvent.getInvoiceLineId(), TABLE_NAME, e));
event.getId(), event.getInvoiceLineId(), TABLE_NAME, e));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@ public OrderEventsDaoImpl(PostgresClientFactory pgClientFactory) {
}

@Override
public Future<RowSet<Row>> save(OrderAuditEvent orderAuditEvent, String tenantId) {
LOGGER.debug("save:: Saving Order AuditEvent with tenant id : {}", tenantId);
public Future<RowSet<Row>> save(OrderAuditEvent event, String tenantId) {
LOGGER.debug("save:: Saving Order AuditEvent with order id: {}", event.getOrderId());
Promise<RowSet<Row>> promise = Promise.promise();
LOGGER.debug("formatDBTableName:: Formatting DB Table Name with tenant id : {}", tenantId);
String logTable = formatDBTableName(tenantId, TABLE_NAME);
String query = format(INSERT_SQL, logTable);
makeSaveCall(promise, query, orderAuditEvent, tenantId);
makeSaveCall(promise, query, event, tenantId);
LOGGER.info("save:: Saved Order AuditEvent with tenant id : {}", tenantId);
return promise.future();
}
Expand Down Expand Up @@ -125,5 +124,4 @@ private OrderAuditEvent mapRowToOrderEvent(Row row) {
.withActionDate(Date.from(row.getLocalDateTime(ACTION_DATE_FIELD).toInstant(ZoneOffset.UTC)))
.withOrderSnapshot(JsonObject.mapFrom(row.getValue(MODIFIED_CONTENT_FIELD)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,12 @@ public OrderLineEventsDaoImpl(PostgresClientFactory pgClientFactory) {
}

@Override
public Future<RowSet<Row>> save(OrderLineAuditEvent orderLineAuditEvent, String tenantId) {
LOGGER.debug("save:: Saving OrderLine AuditEvent with tenant id : {}", tenantId);
public Future<RowSet<Row>> save(OrderLineAuditEvent event, String tenantId) {
LOGGER.debug("save:: Saving OrderLine AuditEvent with order line id : {}", event.getOrderLineId());
Promise<RowSet<Row>> promise = Promise.promise();
LOGGER.debug("formatDBTableName:: Formatting DB Table Name with tenant id : {}", tenantId);
String logTable = formatDBTableName(tenantId, TABLE_NAME);
String query = format(INSERT_SQL, logTable);
makeSaveCall(promise, query, orderLineAuditEvent, tenantId);
makeSaveCall(promise, query, event, tenantId);
LOGGER.info("save:: Saved OrderLine AuditEvent with tenant id : {}", tenantId);
return promise.future();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ public OrganizationEventsDaoImpl(PostgresClientFactory pgClientFactory) {
}

@Override
public Future<RowSet<Row>> save(OrganizationAuditEvent organizationAuditEvent, String tenantId) {
LOGGER.debug("save:: Saving Organization AuditEvent with tenant id : {}", tenantId);
public Future<RowSet<Row>> save(OrganizationAuditEvent event, String tenantId) {
LOGGER.debug("save:: Saving Organization AuditEvent with organization id : {}", event.getOrganizationId());
String logTable = formatDBTableName(tenantId, TABLE_NAME);
String query = format(INSERT_SQL, logTable);
return makeSaveCall(query, organizationAuditEvent, tenantId)
return makeSaveCall(query, event, tenantId)
.onSuccess(rows -> LOGGER.info("save:: Saved Organization AuditEvent with tenant id : {}", tenantId))
.onFailure(e -> LOGGER.error("Failed to save record with id: {} for organization id: {} in to table {}",
organizationAuditEvent.getId(), organizationAuditEvent.getOrganizationId(), TABLE_NAME, e));
event.getId(), event.getOrganizationId(), TABLE_NAME, e));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,13 @@ public PieceEventsDaoImpl(PostgresClientFactory pgClientFactory) {
}

@Override
public Future<RowSet<Row>> save(PieceAuditEvent pieceAuditEvent, String tenantId) {
LOGGER.debug("save:: Trying to save Piece AuditEvent with tenant id : {}", tenantId);
public Future<RowSet<Row>> save(PieceAuditEvent event, String tenantId) {
LOGGER.debug("save:: Trying to save Piece AuditEvent with piece id : {}", event.getPieceId());
Promise<RowSet<Row>> promise = Promise.promise();

LOGGER.debug("formatDBTableName:: Formatting DB Table Name with tenant id : {}", tenantId);
String logTable = formatDBTableName(tenantId, TABLE_NAME);
String query = format(INSERT_SQL, logTable);

makeSaveCall(promise, query, pieceAuditEvent, tenantId);
LOGGER.info("save:: Saved Piece AuditEvent for pieceId={} in tenant id={}", pieceAuditEvent.getPieceId(), tenantId);
makeSaveCall(promise, query, event, tenantId);
LOGGER.info("save:: Saved Piece AuditEvent for pieceId={} in tenant id={}", event.getPieceId(), tenantId);
return promise.future();
}

Expand Down Expand Up @@ -158,5 +155,4 @@ private void makeSaveCall(Promise<RowSet<Row>> promise, String query, PieceAudit
promise.fail(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Collections;

public class ErrorUtils {

private ErrorUtils(){
}

Expand All @@ -26,7 +27,7 @@ public static Errors buildErrors(String statusCode, Throwable throwable) {
}

public static <T> Future<T> handleFailures(Throwable throwable, String id) {
return (throwable instanceof PgException pgException && pgException.getCode().equals(UNIQUE_CONSTRAINT_VIOLATION_CODE)) ?
return (throwable instanceof PgException pgException && pgException.getSqlState().equals(UNIQUE_CONSTRAINT_VIOLATION_CODE)) ?
Future.failedFuture(new DuplicateEventException(String.format("Event with id=%s is already processed.", id))) :
Future.failedFuture(throwable);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.verticle.acquisition.consumers;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
Expand Down Expand Up @@ -30,20 +31,25 @@ public InvoiceEventsHandler(Vertx vertx,

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> kafkaConsumerRecord) {
var result = Promise.<String>promise();
var kafkaHeaders = kafkaConsumerRecord.headers();
var okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx);
var event = new JsonObject(kafkaConsumerRecord.value()).mapTo(InvoiceAuditEvent.class);
LOGGER.info("handle:: Starting processing of Invoice audit event with id: {} for invoice id: {}", event.getId(), event.getInvoiceId());

return invoiceAuditEventsService.saveInvoiceAuditEvent(event, okapiConnectionParams.getTenantId())
.onSuccess(ar -> LOGGER.info("handle:: Invoice audit event with id: {} has been processed for invoice id: {}", event.getId(), event.getInvoiceId()))
invoiceAuditEventsService.saveInvoiceAuditEvent(event, okapiConnectionParams.getTenantId())
.onSuccess(ar -> {
LOGGER.info("handle:: Invoice audit event with id: {} has been processed for invoice id: {}", event.getId(), event.getInvoiceId());
result.complete(event.getId());
})
.onFailure(e -> {
if (e instanceof DuplicateEventException) {
LOGGER.info("handle:: Duplicate Invoice audit event with id: {} for invoice id: {} received, skipped processing", event.getId(), event.getInvoiceId());
result.complete(event.getId());
} else {
LOGGER.error("Processing of Invoice audit event with id: {} for invoice id: {} has been failed", event.getId(), event.getInvoiceId(), e);
result.fail(e);
}
})
.map(event.getId());
});
return result.future();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.verticle.acquisition.consumers;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
Expand Down Expand Up @@ -30,20 +31,26 @@ public InvoiceLineEventsHandler(Vertx vertx,

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> kafkaConsumerRecord) {
var result = Promise.<String>promise();
var kafkaHeaders = kafkaConsumerRecord.headers();
var okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx);
var event = new JsonObject(kafkaConsumerRecord.value()).mapTo(InvoiceLineAuditEvent.class);
LOGGER.info("handle:: Starting processing of Invoice Line audit event with id: {} for invoice line id: {}", event.getId(), event.getInvoiceLineId());

return invoiceLineAuditEventsService.saveInvoiceLineAuditEvent(event, okapiConnectionParams.getTenantId())
.onSuccess(ar -> LOGGER.info("handle:: Invoice Line audit event with id: {} has been processed for invoice line id: {}", event.getId(), event.getInvoiceLineId()))
invoiceLineAuditEventsService.saveInvoiceLineAuditEvent(event, okapiConnectionParams.getTenantId())
.onSuccess(ar -> {
LOGGER.info("handle:: Invoice Line audit event with id: {} has been processed for invoice line id: {}",
event.getId(), event.getInvoiceLineId());
result.complete(event.getId());
})
.onFailure(e -> {
if (e instanceof DuplicateEventException) {
LOGGER.info("handle:: Duplicate Invoice Line audit event with id: {} for invoice line id: {} received, skipped processing", event.getId(), event.getInvoiceLineId());
result.complete(event.getId());
} else {
LOGGER.error("Processing of Invoice Line audit event with id: {} for invoice line id: {} has been failed", event.getId(), event.getInvoiceLineId(), e);
result.fail(e);
}
})
.map(event.getId());
});
return result.future();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.verticle.acquisition.consumers;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
Expand Down Expand Up @@ -30,20 +31,25 @@ public OrderEventsHandler(Vertx vertx,

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> kafkaConsumerRecord) {
var result = Promise.<String>promise();
var kafkaHeaders = kafkaConsumerRecord.headers();
var okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx);
var event = new JsonObject(kafkaConsumerRecord.value()).mapTo(OrderAuditEvent.class);
LOGGER.info("handle:: Starting processing of Order audit event with id: {} for order id: {}", event.getId(), event.getOrderId());

return orderAuditEventsService.saveOrderAuditEvent(event, okapiConnectionParams.getTenantId())
.onSuccess(ar -> LOGGER.info("handle:: Order audit event with id: {} has been processed for order id: {}", event.getId(), event.getOrderId()))
orderAuditEventsService.saveOrderAuditEvent(event, okapiConnectionParams.getTenantId())
.onSuccess(ar -> {
LOGGER.info("handle:: Order audit event with id: {} has been processed for order id: {}", event.getId(), event.getOrderId());
result.complete(event.getId());
})
.onFailure(e -> {
if (e instanceof DuplicateEventException) {
LOGGER.info("handle:: Duplicate Order audit event with id: {} for order id: {} received, skipped processing", event.getId(), event.getOrderId());
result.complete(event.getId());
} else {
LOGGER.error("Processing of Order audit event with id: {} for order id: {} has been failed", event.getId(), event.getOrderId(), e);
result.fail(e);
}
})
.map(event.getId());
});
return result.future();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.verticle.acquisition.consumers;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
Expand Down Expand Up @@ -30,20 +31,29 @@ public OrderLineEventsHandler(Vertx vertx,

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> kafkaConsumerRecord) {
var result = Promise.<String>promise();
var kafkaHeaders = kafkaConsumerRecord.headers();
var okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx);
var event = new JsonObject(kafkaConsumerRecord.value()).mapTo(OrderLineAuditEvent.class);
LOGGER.info("handle:: Starting processing of Order Line audit event with id: {} for order id: {} and order line id: {}", event.getId(), event.getOrderId(), event.getOrderLineId());

return orderLineAuditEventsService.saveOrderLineAuditEvent(event, okapiConnectionParams.getTenantId())
.onSuccess(ar -> LOGGER.info("handle:: Order Line audit event with id: {} has been processed for order id: {} and order line id: {}", event.getId(), event.getOrderId(), event.getOrderLineId()))
LOGGER.info("handle:: Starting processing of Order Line audit event with id: {} for order id: {} and order line id: {}",
event.getId(), event.getOrderId(), event.getOrderLineId());
orderLineAuditEventsService.saveOrderLineAuditEvent(event, okapiConnectionParams.getTenantId())
.onSuccess(ar -> {
LOGGER.info("handle:: Order Line audit event with id: {} has been processed for order id: {} and order line id: {}",
event.getId(), event.getOrderId(), event.getOrderLineId());
result.complete(event.getId());
})
.onFailure(e -> {
if (e instanceof DuplicateEventException) {
LOGGER.info("handle:: Duplicate Order Line audit event with id: {} for order id: {} and order line id: {} received, skipped processing", event.getId(), event.getOrderId(), event.getOrderLineId());
LOGGER.info("handle:: Duplicate Order Line audit event with id: {} for order id: {} and order line id: {} received, skipped processing",
event.getId(), event.getOrderId(), event.getOrderLineId());
result.complete(event.getId());
} else {
LOGGER.error("Processing of Order Line audit event with id: {} for order id: {} and order line id: {} has been failed", event.getId(), event.getOrderId(), event.getOrderLineId(), e);
LOGGER.error("Processing of Order Line audit event with id: {} for order id: {} and order line id: {} has been failed",
event.getId(), event.getOrderId(), event.getOrderLineId(), e);
result.fail(e);
}
})
.map(event.getId());
});
return result.future();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.verticle.acquisition.consumers;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
Expand Down Expand Up @@ -30,20 +31,25 @@ public OrganizationEventsHandler(Vertx vertx,

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> kafkaConsumerRecord) {
var result = Promise.<String>promise();
var kafkaHeaders = kafkaConsumerRecord.headers();
var okapiConnectionParams = new OkapiConnectionParams(KafkaHeaderUtils.kafkaHeadersToMap(kafkaHeaders), vertx);
var event = new JsonObject(kafkaConsumerRecord.value()).mapTo(OrganizationAuditEvent.class);
LOGGER.info("handle:: Starting processing of Organization audit event with id: {} for organization id: {}", event.getId(), event.getOrganizationId());

return organizationAuditEventsService.saveOrganizationAuditEvent(event, okapiConnectionParams.getTenantId())
.onSuccess(ar -> LOGGER.info("handle:: Organization audit event with id: {} has been processed for organization id: {}", event.getId(), event.getOrganizationId()))
organizationAuditEventsService.saveOrganizationAuditEvent(event, okapiConnectionParams.getTenantId())
.onSuccess(ar -> {
LOGGER.info("handle:: Organization audit event with id: {} has been processed for organization id: {}", event.getId(), event.getOrganizationId());
result.complete(event.getId());
})
.onFailure(e -> {
if (e instanceof DuplicateEventException) {
LOGGER.info("handle:: Duplicate Organization audit event with id: {} for organization id: {} received, skipped processing", event.getId(), event.getOrganizationId());
result.complete(event.getId());
} else {
LOGGER.error("Processing of Organization audit event with id: {} for organization id: {} has been failed", event.getId(), event.getOrganizationId(), e);
result.fail(e);
}
})
.map(event.getId());
});
return result.future();
}
}
Loading

0 comments on commit 6f2e51b

Please sign in to comment.