Skip to content

Commit

Permalink
[MODINVOSTO-187] Use string queries instead of conn save and update a…
Browse files Browse the repository at this point in the history
…s it is only for id-jsonb fields
  • Loading branch information
Saba-Zedginidze-EPAM committed Nov 5, 2024
1 parent 9e6c55a commit 9b833ed
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface AuditOutboxEventLogDAO {

Future<List<OutboxEventLog>> getEventLogs(Conn conn, String tenantId);

Future<String> saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId);
Future<Void> saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId);

Future<Integer> deleteEventLogs(Conn conn, List<String> eventIds, String tenantId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@

import org.folio.rest.jaxrs.model.OutboxEventLog;
import org.folio.rest.persist.Conn;
import org.folio.rest.persist.Criteria.Criterion;
import org.folio.rest.persist.Criteria.Limit;
import org.folio.rest.persist.interfaces.Results;
import org.folio.service.util.OutboxEventFields;

import io.vertx.core.Future;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.Tuple;
import lombok.extern.log4j.Log4j2;
import one.util.streamex.StreamEx;

@Log4j2
public class AuditOutboxEventLogPostgresDAO implements AuditOutboxEventLogDAO {

public static final String OUTBOX_TABLE_NAME = "outbox_event_log";
private static final String BATCH_DELETE = "DELETE from %s where event_id = ANY ($1)";
private static final String SELECT_SQL = "SELECT * FROM %s LIMIT 1000";
private static final String INSERT_SQL = "INSERT INTO %s (event_id, entity_type, action, payload) VALUES ($1, $2, $3, $4)";
private static final String BATCH_DELETE_SQL = "DELETE from %s where event_id = ANY ($1)";

/**
* Get all event logs from outbox table.
Expand All @@ -32,8 +34,8 @@ public class AuditOutboxEventLogPostgresDAO implements AuditOutboxEventLogDAO {
public Future<List<OutboxEventLog>> getEventLogs(Conn conn, String tenantId) {
log.trace("getEventLogs:: Fetching event logs from outbox table for tenantId: '{}'", tenantId);
var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME);
return conn.get(tableName, OutboxEventLog.class, new Criterion().setLimit(new Limit(1000)))
.map(Results::getResults)
return conn.execute(SELECT_SQL.formatted(tableName))
.map(rows -> StreamEx.of(rows.iterator()).map(this::convertDbRowToOutboxEventLog).toList())
.onFailure(t -> log.warn("getEventLogs:: Failed to fetch event logs for tenantId: '{}'", tenantId, t));
}

Expand All @@ -45,11 +47,13 @@ public Future<List<OutboxEventLog>> getEventLogs(Conn conn, String tenantId) {
* @param tenantId the tenant id
* @return future of id of the inserted entity
*/
public Future<String> saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId) {
public Future<Void> saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId) {
log.debug("saveEventLog:: Saving event log to outbox table with eventId: '{}'", eventLog.getEventId());
var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME);
return conn.save(tableName, eventLog)
.onFailure(t -> log.warn("saveEventLog:: Failed to save event log with id: '{}'", eventLog.getEventId(), t));
Tuple params = Tuple.of(eventLog.getEventId(), eventLog.getEntityType().value(), eventLog.getAction(), eventLog.getPayload());
return conn.execute(INSERT_SQL.formatted(tableName), params)
.onFailure(t -> log.warn("saveEventLog:: Failed to save event log with id: '{}'", eventLog.getEventId(), t))
.mapEmpty();
}

/**
Expand All @@ -64,9 +68,17 @@ public Future<Integer> deleteEventLogs(Conn conn, List<String> eventIds, String
log.debug("deleteEventLogs:: Deleting outbox logs by event ids in batch: '{}'", eventIds);
var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME);
var param = eventIds.stream().map(UUID::fromString).toArray(UUID[]::new);
return conn.execute(BATCH_DELETE.formatted(tableName), Tuple.of(param))
return conn.execute(BATCH_DELETE_SQL.formatted(tableName), Tuple.of(param))
.map(SqlResult::rowCount)
.onFailure(t -> log.warn("deleteEventLogs: Failed to delete event logs by ids: '{}'", eventIds, t));
}

private OutboxEventLog convertDbRowToOutboxEventLog(Row row) {
return new OutboxEventLog()
.withEventId(row.getUUID(OutboxEventFields.EVENT_ID.getName()).toString())
.withEntityType(OutboxEventLog.EntityType.fromValue(row.getString(OutboxEventFields.ENTITY_TYPE.getName())))
.withAction(row.getString(OutboxEventFields.ACTION.getName()))
.withPayload(row.getString(OutboxEventFields.PAYLOAD.getName()));
}

}
7 changes: 1 addition & 6 deletions src/main/java/org/folio/rest/impl/AuditOutboxAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import lombok.extern.log4j.Log4j2;

@Log4j2
public class AuditOutboxAPI implements InvoiceStorageAuditOutbox {

@Autowired
Expand All @@ -29,9 +27,6 @@ public AuditOutboxAPI() {
public void postInvoiceStorageAuditOutboxProcess(Map<String, String> okapiHeaders, Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
auditOutboxService.processOutboxEventLogs(okapiHeaders, vertxContext)
.onSuccess(res -> asyncResultHandler.handle(Future.succeededFuture(Response.ok().build())))
.onFailure(cause -> {
log.warn("postInvoiceStorageAuditOutboxProcess:: Processing of outbox events table has failed", cause);
asyncResultHandler.handle(Future.failedFuture(cause));
});
.onFailure(cause -> asyncResultHandler.handle(Future.failedFuture(cause)));
}
}
4 changes: 2 additions & 2 deletions src/main/java/org/folio/service/audit/AuditEventProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Future<Void> sendInvoiceEvent(Invoice invoice, InvoiceAuditEvent.Action e
public Future<Void> sendInvoiceLineEvent(InvoiceLine invoiceLine, InvoiceLineAuditEvent.Action action, Map<String, String> okapiHeaders) {
var event = getAuditEvent(invoiceLine, action);
log.info("sendInvoiceLineEvent:: Sending event with id: {} and invoiceLineId: {} to Kafka", event.getId(), invoiceLine.getId());
return sendToKafka(EventTopic.ACQ_INVOICE_LINE_CHANGED, event.getInvoiceId(), event, okapiHeaders)
return sendToKafka(EventTopic.ACQ_INVOICE_LINE_CHANGED, event.getInvoiceLineId(), event, okapiHeaders)
.onFailure(t -> log.warn("sendInvoiceLineEvent:: Failed to send event with id: {} and invoiceLineId: {} to Kafka", event.getId(), invoiceLine.getId(), t));
}

Expand All @@ -62,7 +62,7 @@ private InvoiceAuditEvent getAuditEvent(Invoice invoice, InvoiceAuditEvent.Actio
.withInvoiceSnapshot(invoice.withMetadata(null));
}

private InvoiceAuditEvent getAuditEvent(InvoiceLine invoiceLine, InvoiceLineAuditEvent.Action eventAction) {
private InvoiceLineAuditEvent getAuditEvent(InvoiceLine invoiceLine, InvoiceLineAuditEvent.Action eventAction) {
return new InvoiceLineAuditEvent()
.withId(UUID.randomUUID().toString())
.withAction(eventAction)
Expand Down
35 changes: 18 additions & 17 deletions src/main/java/org/folio/service/audit/AuditOutboxService.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,21 @@ public Future<Integer> processOutboxEventLogs(Map<String, String> okapiHeaders,
var tenantId = TenantTool.tenantId(okapiHeaders);
return new DBClient(vertxContext, okapiHeaders).getPgClient()
.withTrans(conn -> internalLockDAO.selectWithLocking(conn, OUTBOX_LOCK_NAME, tenantId)
.compose(retrievedCount -> outboxEventLogDAO.getEventLogs(conn, tenantId))
.compose(logs -> {
if (CollectionUtils.isEmpty(logs)) {
log.info("processOutboxEventLogs: No logs found in outbox table");
return Future.succeededFuture(0);
}
log.info("processOutboxEventLogs: {} logs found in outbox table, sending to kafka", logs.size());
return GenericCompositeFuture.join(sendEventLogsToKafka(logs, okapiHeaders))
.map(logs.stream().map(OutboxEventLog::getEventId).toList())
.compose(eventIds -> outboxEventLogDAO.deleteEventLogs(conn, eventIds, tenantId))
.onSuccess(count -> log.info("processOutboxEventLogs:: {} logs have been deleted from outbox table", count))
.onFailure(ex -> log.error("Logs deletion failed", ex));
})
);
.compose(retrievedCount -> outboxEventLogDAO.getEventLogs(conn, tenantId))
.compose(logs -> {
if (CollectionUtils.isEmpty(logs)) {
log.info("processOutboxEventLogs: No logs found in outbox table");
return Future.succeededFuture(0);
}
log.info("processOutboxEventLogs: {} logs found in outbox table, sending to kafka", logs.size());
return GenericCompositeFuture.join(sendEventLogsToKafka(logs, okapiHeaders))
.map(logs.stream().map(OutboxEventLog::getEventId).toList())
.compose(eventIds -> outboxEventLogDAO.deleteEventLogs(conn, eventIds, tenantId))
.onSuccess(count -> log.info("processOutboxEventLogs:: {} logs have been deleted from outbox table", count))
.onFailure(ex -> log.error("Logs deletion failed", ex));
})
.onSuccess(count -> log.info("processOutboxEventLogs:: Successfully processed outbox event logs: {}", count))
.onFailure(ex -> log.error("Failed to process outbox event logs", ex)));
}

private List<Future<Void>> sendEventLogsToKafka(List<OutboxEventLog> eventLogs, Map<String, String> okapiHeaders) {
Expand Down Expand Up @@ -86,7 +87,7 @@ private List<Future<Void>> sendEventLogsToKafka(List<OutboxEventLog> eventLogs,
* @param okapiHeaders okapi headers
* @return future with saved outbox log id in the same transaction
*/
public Future<String> saveInvoiceOutboxLog(Conn conn, Invoice entity, InvoiceAuditEvent.Action action, Map<String, String> okapiHeaders) {
public Future<Void> saveInvoiceOutboxLog(Conn conn, Invoice entity, InvoiceAuditEvent.Action action, Map<String, String> okapiHeaders) {
return saveOutboxLog(conn, okapiHeaders, action.value(), EntityType.INVOICE, entity.getId(), entity);
}

Expand All @@ -99,11 +100,11 @@ public Future<String> saveInvoiceOutboxLog(Conn conn, Invoice entity, InvoiceAud
* @param okapiHeaders okapi headers
* @return future with saved outbox log id in the same transaction
*/
public Future<String> saveInvoiceLineOutboxLog(Conn conn, InvoiceLine entity, InvoiceLineAuditEvent.Action action, Map<String, String> okapiHeaders) {
public Future<Void> saveInvoiceLineOutboxLog(Conn conn, InvoiceLine entity, InvoiceLineAuditEvent.Action action, Map<String, String> okapiHeaders) {
return saveOutboxLog(conn, okapiHeaders, action.value(), EntityType.INVOICE_LINE, entity.getId(), entity);
}

private Future<String> saveOutboxLog(Conn conn, Map<String, String> okapiHeaders, String action, EntityType entityType, String entityId, Object entity) {
private Future<Void> saveOutboxLog(Conn conn, Map<String, String> okapiHeaders, String action, EntityType entityType, String entityId, Object entity) {
log.debug("saveOutboxLog:: Saving outbox log for {} with id: {}", entityType, entityId);
var eventLog = new OutboxEventLog()
.withEventId(UUID.randomUUID().toString())
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/org/folio/service/util/OutboxEventFields.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.folio.service.util;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor
public enum OutboxEventFields {

EVENT_ID("event_id"),
ENTITY_TYPE("entity_type"),
ACTION("action"),
PAYLOAD("payload");

private final String name;

}
4 changes: 2 additions & 2 deletions src/main/resources/templates/db_scripts/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@
{
"run": "after",
"snippetPath": "tables/create_audit_outbox_table.sql",
"fromModuleVersion": "mod-invoice-storage-5.9.0"
"fromModuleVersion": "mod-invoice-storage-6.0.0"
},
{
"run": "after",
"snippetPath": "tables/create_internal_lock_table.sql",
"fromModuleVersion": "mod-invoice-storage-5.9.0"
"fromModuleVersion": "mod-invoice-storage-6.0.0"
}
],
"tables": [
Expand Down

0 comments on commit 9b833ed

Please sign in to comment.