From 9b833edee29d173b9fd304ae456eea362451c863 Mon Sep 17 00:00:00 2001 From: saba_zedginidze Date: Tue, 5 Nov 2024 17:16:42 +0400 Subject: [PATCH] [MODINVOSTO-187] Use string queries instead of conn save and update as it is only for id-jsonb fields --- .../dao/audit/AuditOutboxEventLogDAO.java | 2 +- .../audit/AuditOutboxEventLogPostgresDAO.java | 32 +++++++++++------ .../org/folio/rest/impl/AuditOutboxAPI.java | 7 +--- .../service/audit/AuditEventProducer.java | 4 +-- .../service/audit/AuditOutboxService.java | 35 ++++++++++--------- .../folio/service/util/OutboxEventFields.java | 17 +++++++++ .../templates/db_scripts/schema.json | 4 +-- 7 files changed, 63 insertions(+), 38 deletions(-) create mode 100644 src/main/java/org/folio/service/util/OutboxEventFields.java diff --git a/src/main/java/org/folio/dao/audit/AuditOutboxEventLogDAO.java b/src/main/java/org/folio/dao/audit/AuditOutboxEventLogDAO.java index a145b4ba..aff5c0f0 100644 --- a/src/main/java/org/folio/dao/audit/AuditOutboxEventLogDAO.java +++ b/src/main/java/org/folio/dao/audit/AuditOutboxEventLogDAO.java @@ -11,7 +11,7 @@ public interface AuditOutboxEventLogDAO { Future> getEventLogs(Conn conn, String tenantId); - Future saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId); + Future saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId); Future deleteEventLogs(Conn conn, List eventIds, String tenantId); diff --git a/src/main/java/org/folio/dao/audit/AuditOutboxEventLogPostgresDAO.java b/src/main/java/org/folio/dao/audit/AuditOutboxEventLogPostgresDAO.java index ac85ed9b..74bb734f 100644 --- a/src/main/java/org/folio/dao/audit/AuditOutboxEventLogPostgresDAO.java +++ b/src/main/java/org/folio/dao/audit/AuditOutboxEventLogPostgresDAO.java @@ -7,20 +7,22 @@ import org.folio.rest.jaxrs.model.OutboxEventLog; import org.folio.rest.persist.Conn; -import org.folio.rest.persist.Criteria.Criterion; -import org.folio.rest.persist.Criteria.Limit; -import org.folio.rest.persist.interfaces.Results; +import org.folio.service.util.OutboxEventFields; import io.vertx.core.Future; +import io.vertx.sqlclient.Row; import io.vertx.sqlclient.SqlResult; import io.vertx.sqlclient.Tuple; import lombok.extern.log4j.Log4j2; +import one.util.streamex.StreamEx; @Log4j2 public class AuditOutboxEventLogPostgresDAO implements AuditOutboxEventLogDAO { public static final String OUTBOX_TABLE_NAME = "outbox_event_log"; - private static final String BATCH_DELETE = "DELETE from %s where event_id = ANY ($1)"; + private static final String SELECT_SQL = "SELECT * FROM %s LIMIT 1000"; + private static final String INSERT_SQL = "INSERT INTO %s (event_id, entity_type, action, payload) VALUES ($1, $2, $3, $4)"; + private static final String BATCH_DELETE_SQL = "DELETE from %s where event_id = ANY ($1)"; /** * Get all event logs from outbox table. @@ -32,8 +34,8 @@ public class AuditOutboxEventLogPostgresDAO implements AuditOutboxEventLogDAO { public Future> getEventLogs(Conn conn, String tenantId) { log.trace("getEventLogs:: Fetching event logs from outbox table for tenantId: '{}'", tenantId); var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME); - return conn.get(tableName, OutboxEventLog.class, new Criterion().setLimit(new Limit(1000))) - .map(Results::getResults) + return conn.execute(SELECT_SQL.formatted(tableName)) + .map(rows -> StreamEx.of(rows.iterator()).map(this::convertDbRowToOutboxEventLog).toList()) .onFailure(t -> log.warn("getEventLogs:: Failed to fetch event logs for tenantId: '{}'", tenantId, t)); } @@ -45,11 +47,13 @@ public Future> getEventLogs(Conn conn, String tenantId) { * @param tenantId the tenant id * @return future of id of the inserted entity */ - public Future saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId) { + public Future saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId) { log.debug("saveEventLog:: Saving event log to outbox table with eventId: '{}'", eventLog.getEventId()); var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME); - return conn.save(tableName, eventLog) - .onFailure(t -> log.warn("saveEventLog:: Failed to save event log with id: '{}'", eventLog.getEventId(), t)); + Tuple params = Tuple.of(eventLog.getEventId(), eventLog.getEntityType().value(), eventLog.getAction(), eventLog.getPayload()); + return conn.execute(INSERT_SQL.formatted(tableName), params) + .onFailure(t -> log.warn("saveEventLog:: Failed to save event log with id: '{}'", eventLog.getEventId(), t)) + .mapEmpty(); } /** @@ -64,9 +68,17 @@ public Future deleteEventLogs(Conn conn, List eventIds, String log.debug("deleteEventLogs:: Deleting outbox logs by event ids in batch: '{}'", eventIds); var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME); var param = eventIds.stream().map(UUID::fromString).toArray(UUID[]::new); - return conn.execute(BATCH_DELETE.formatted(tableName), Tuple.of(param)) + return conn.execute(BATCH_DELETE_SQL.formatted(tableName), Tuple.of(param)) .map(SqlResult::rowCount) .onFailure(t -> log.warn("deleteEventLogs: Failed to delete event logs by ids: '{}'", eventIds, t)); } + private OutboxEventLog convertDbRowToOutboxEventLog(Row row) { + return new OutboxEventLog() + .withEventId(row.getUUID(OutboxEventFields.EVENT_ID.getName()).toString()) + .withEntityType(OutboxEventLog.EntityType.fromValue(row.getString(OutboxEventFields.ENTITY_TYPE.getName()))) + .withAction(row.getString(OutboxEventFields.ACTION.getName())) + .withPayload(row.getString(OutboxEventFields.PAYLOAD.getName())); + } + } diff --git a/src/main/java/org/folio/rest/impl/AuditOutboxAPI.java b/src/main/java/org/folio/rest/impl/AuditOutboxAPI.java index e8309edb..5fc363a9 100644 --- a/src/main/java/org/folio/rest/impl/AuditOutboxAPI.java +++ b/src/main/java/org/folio/rest/impl/AuditOutboxAPI.java @@ -13,9 +13,7 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; -import lombok.extern.log4j.Log4j2; -@Log4j2 public class AuditOutboxAPI implements InvoiceStorageAuditOutbox { @Autowired @@ -29,9 +27,6 @@ public AuditOutboxAPI() { public void postInvoiceStorageAuditOutboxProcess(Map okapiHeaders, Handler> asyncResultHandler, Context vertxContext) { auditOutboxService.processOutboxEventLogs(okapiHeaders, vertxContext) .onSuccess(res -> asyncResultHandler.handle(Future.succeededFuture(Response.ok().build()))) - .onFailure(cause -> { - log.warn("postInvoiceStorageAuditOutboxProcess:: Processing of outbox events table has failed", cause); - asyncResultHandler.handle(Future.failedFuture(cause)); - }); + .onFailure(cause -> asyncResultHandler.handle(Future.failedFuture(cause))); } } diff --git a/src/main/java/org/folio/service/audit/AuditEventProducer.java b/src/main/java/org/folio/service/audit/AuditEventProducer.java index 1117464b..de9f3245 100644 --- a/src/main/java/org/folio/service/audit/AuditEventProducer.java +++ b/src/main/java/org/folio/service/audit/AuditEventProducer.java @@ -47,7 +47,7 @@ public Future sendInvoiceEvent(Invoice invoice, InvoiceAuditEvent.Action e public Future sendInvoiceLineEvent(InvoiceLine invoiceLine, InvoiceLineAuditEvent.Action action, Map okapiHeaders) { var event = getAuditEvent(invoiceLine, action); log.info("sendInvoiceLineEvent:: Sending event with id: {} and invoiceLineId: {} to Kafka", event.getId(), invoiceLine.getId()); - return sendToKafka(EventTopic.ACQ_INVOICE_LINE_CHANGED, event.getInvoiceId(), event, okapiHeaders) + return sendToKafka(EventTopic.ACQ_INVOICE_LINE_CHANGED, event.getInvoiceLineId(), event, okapiHeaders) .onFailure(t -> log.warn("sendInvoiceLineEvent:: Failed to send event with id: {} and invoiceLineId: {} to Kafka", event.getId(), invoiceLine.getId(), t)); } @@ -62,7 +62,7 @@ private InvoiceAuditEvent getAuditEvent(Invoice invoice, InvoiceAuditEvent.Actio .withInvoiceSnapshot(invoice.withMetadata(null)); } - private InvoiceAuditEvent getAuditEvent(InvoiceLine invoiceLine, InvoiceLineAuditEvent.Action eventAction) { + private InvoiceLineAuditEvent getAuditEvent(InvoiceLine invoiceLine, InvoiceLineAuditEvent.Action eventAction) { return new InvoiceLineAuditEvent() .withId(UUID.randomUUID().toString()) .withAction(eventAction) diff --git a/src/main/java/org/folio/service/audit/AuditOutboxService.java b/src/main/java/org/folio/service/audit/AuditOutboxService.java index aac5a800..dde6207e 100644 --- a/src/main/java/org/folio/service/audit/AuditOutboxService.java +++ b/src/main/java/org/folio/service/audit/AuditOutboxService.java @@ -45,20 +45,21 @@ public Future processOutboxEventLogs(Map okapiHeaders, var tenantId = TenantTool.tenantId(okapiHeaders); return new DBClient(vertxContext, okapiHeaders).getPgClient() .withTrans(conn -> internalLockDAO.selectWithLocking(conn, OUTBOX_LOCK_NAME, tenantId) - .compose(retrievedCount -> outboxEventLogDAO.getEventLogs(conn, tenantId)) - .compose(logs -> { - if (CollectionUtils.isEmpty(logs)) { - log.info("processOutboxEventLogs: No logs found in outbox table"); - return Future.succeededFuture(0); - } - log.info("processOutboxEventLogs: {} logs found in outbox table, sending to kafka", logs.size()); - return GenericCompositeFuture.join(sendEventLogsToKafka(logs, okapiHeaders)) - .map(logs.stream().map(OutboxEventLog::getEventId).toList()) - .compose(eventIds -> outboxEventLogDAO.deleteEventLogs(conn, eventIds, tenantId)) - .onSuccess(count -> log.info("processOutboxEventLogs:: {} logs have been deleted from outbox table", count)) - .onFailure(ex -> log.error("Logs deletion failed", ex)); - }) - ); + .compose(retrievedCount -> outboxEventLogDAO.getEventLogs(conn, tenantId)) + .compose(logs -> { + if (CollectionUtils.isEmpty(logs)) { + log.info("processOutboxEventLogs: No logs found in outbox table"); + return Future.succeededFuture(0); + } + log.info("processOutboxEventLogs: {} logs found in outbox table, sending to kafka", logs.size()); + return GenericCompositeFuture.join(sendEventLogsToKafka(logs, okapiHeaders)) + .map(logs.stream().map(OutboxEventLog::getEventId).toList()) + .compose(eventIds -> outboxEventLogDAO.deleteEventLogs(conn, eventIds, tenantId)) + .onSuccess(count -> log.info("processOutboxEventLogs:: {} logs have been deleted from outbox table", count)) + .onFailure(ex -> log.error("Logs deletion failed", ex)); + }) + .onSuccess(count -> log.info("processOutboxEventLogs:: Successfully processed outbox event logs: {}", count)) + .onFailure(ex -> log.error("Failed to process outbox event logs", ex))); } private List> sendEventLogsToKafka(List eventLogs, Map okapiHeaders) { @@ -86,7 +87,7 @@ private List> sendEventLogsToKafka(List eventLogs, * @param okapiHeaders okapi headers * @return future with saved outbox log id in the same transaction */ - public Future saveInvoiceOutboxLog(Conn conn, Invoice entity, InvoiceAuditEvent.Action action, Map okapiHeaders) { + public Future saveInvoiceOutboxLog(Conn conn, Invoice entity, InvoiceAuditEvent.Action action, Map okapiHeaders) { return saveOutboxLog(conn, okapiHeaders, action.value(), EntityType.INVOICE, entity.getId(), entity); } @@ -99,11 +100,11 @@ public Future saveInvoiceOutboxLog(Conn conn, Invoice entity, InvoiceAud * @param okapiHeaders okapi headers * @return future with saved outbox log id in the same transaction */ - public Future saveInvoiceLineOutboxLog(Conn conn, InvoiceLine entity, InvoiceLineAuditEvent.Action action, Map okapiHeaders) { + public Future saveInvoiceLineOutboxLog(Conn conn, InvoiceLine entity, InvoiceLineAuditEvent.Action action, Map okapiHeaders) { return saveOutboxLog(conn, okapiHeaders, action.value(), EntityType.INVOICE_LINE, entity.getId(), entity); } - private Future saveOutboxLog(Conn conn, Map okapiHeaders, String action, EntityType entityType, String entityId, Object entity) { + private Future saveOutboxLog(Conn conn, Map okapiHeaders, String action, EntityType entityType, String entityId, Object entity) { log.debug("saveOutboxLog:: Saving outbox log for {} with id: {}", entityType, entityId); var eventLog = new OutboxEventLog() .withEventId(UUID.randomUUID().toString()) diff --git a/src/main/java/org/folio/service/util/OutboxEventFields.java b/src/main/java/org/folio/service/util/OutboxEventFields.java new file mode 100644 index 00000000..ad7ce7d0 --- /dev/null +++ b/src/main/java/org/folio/service/util/OutboxEventFields.java @@ -0,0 +1,17 @@ +package org.folio.service.util; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +public enum OutboxEventFields { + + EVENT_ID("event_id"), + ENTITY_TYPE("entity_type"), + ACTION("action"), + PAYLOAD("payload"); + + private final String name; + +} diff --git a/src/main/resources/templates/db_scripts/schema.json b/src/main/resources/templates/db_scripts/schema.json index 8b90e8ea..9a95be83 100644 --- a/src/main/resources/templates/db_scripts/schema.json +++ b/src/main/resources/templates/db_scripts/schema.json @@ -88,12 +88,12 @@ { "run": "after", "snippetPath": "tables/create_audit_outbox_table.sql", - "fromModuleVersion": "mod-invoice-storage-5.9.0" + "fromModuleVersion": "mod-invoice-storage-6.0.0" }, { "run": "after", "snippetPath": "tables/create_internal_lock_table.sql", - "fromModuleVersion": "mod-invoice-storage-5.9.0" + "fromModuleVersion": "mod-invoice-storage-6.0.0" } ], "tables": [