diff --git a/descriptors/ModuleDescriptor-template.json b/descriptors/ModuleDescriptor-template.json index 3d39e07e..cb70d409 100644 --- a/descriptors/ModuleDescriptor-template.json +++ b/descriptors/ModuleDescriptor-template.json @@ -338,6 +338,20 @@ "permissionsRequired": ["batch-group-storage.batch-groups.item.delete"] } ] + }, + { + "id": "_timer", + "version": "1.0", + "interfaceType": "system", + "handlers": [ + { + "methods": ["POST"], + "pathPattern": "/invoice-storage/audit-outbox/process", + "modulePermissions": [], + "unit": "minute", + "delay": "30" + } + ] } ], "permissionSets": [ @@ -703,7 +717,9 @@ { "name": "DB_DATABASE", "value": "okapi_modules" }, { "name": "DB_QUERYTIMEOUT", "value": "60000" }, { "name": "DB_CHARSET", "value": "UTF-8" }, - { "name": "DB_MAXPOOLSIZE", "value": "5" } + { "name": "DB_MAXPOOLSIZE", "value": "5" }, + { "name": "KAFKA_HOST", "value": "10.0.2.15" }, + { "name": "KAFKA_PORT", "value": "9092" } ] } } diff --git a/pom.xml b/pom.xml index a938fdef..e7aeb96c 100644 --- a/pom.xml +++ b/pom.xml @@ -24,10 +24,12 @@ 4.5.10 + 3.6.1 2.24.1 1.0.0 + 3.2.0 1.9.22.1 @@ -156,6 +158,16 @@ ${raml-module-builder.version} test + + org.folio + folio-kafka-wrapper + ${folio-kafka-wrapper.version} + + + io.vertx + vertx-kafka-client + ${vertx.version} + org.mockito mockito-junit-jupiter @@ -172,6 +184,35 @@ org.apache.logging.log4j log4j-slf4j-impl + + org.apache.kafka + kafka-clients + ${kafkaclients.version} + + + net.mguenther.kafka + kafka-junit + 3.6.0 + test + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + net.sf.jopt-simple + jopt-simple + + + com.fasterxml.jackson.module + jackson-module-scala_2.13 + + + @@ -343,6 +384,10 @@ true false true + + **/impl/*.java + **/*.aj + org.folio diff --git a/ramls/acq-models b/ramls/acq-models index d117c820..0d59a293 160000 --- a/ramls/acq-models +++ b/ramls/acq-models @@ -1 +1 @@ -Subproject commit d117c8209d9b5a4f6248d48d497000675aec4f45 +Subproject commit 0d59a293cf690938838980dcb29d5f17d652e927 diff --git a/ramls/audit-outbox.raml b/ramls/audit-outbox.raml new file mode 100644 index 00000000..2901d15a --- /dev/null +++ b/ramls/audit-outbox.raml @@ -0,0 +1,20 @@ +#%RAML 1.0 +title: Audit outbox API +version: v1.0 +protocols: [ HTTP, HTTPS ] +baseUri: http://github.com/folio-org/mod-invoice-storage + +documentation: + - title: Audit outbox API + content: This API is intended for internal use only by the Timer interface + +types: + outbox-event-log: !include acq-models/mod-invoice-storage/schemas/outbox_event_log.json + invoice-audit-event: !include acq-models/mod-invoice-storage/schemas/invoice_audit_event.json + invoice-line-audit-event: !include acq-models/mod-invoice-storage/schemas/invoice_line_audit_event.json + event-topic: !include acq-models/mod-invoice-storage/schemas/event_topic.json + +/invoice-storage/audit-outbox: + /process: + post: + description: Read audit events from DB and send them to Kafka diff --git a/src/main/java/org/folio/config/ApplicationConfig.java b/src/main/java/org/folio/config/ApplicationConfig.java index a4b6ee75..53e00ef3 100644 --- a/src/main/java/org/folio/config/ApplicationConfig.java +++ b/src/main/java/org/folio/config/ApplicationConfig.java @@ -1,17 +1,27 @@ package org.folio.config; +import org.folio.dao.audit.AuditOutboxEventLogDAO; +import org.folio.dao.audit.AuditOutboxEventLogPostgresDAO; import org.folio.dao.invoice.InvoiceDAO; import org.folio.dao.invoice.InvoicePostgresDAO; import org.folio.dao.lines.InvoiceLinesDAO; import org.folio.dao.lines.InvoiceLinesPostgresDAO; +import org.folio.dao.lock.InternalLockDAO; +import org.folio.dao.lock.InternalLockPostgresDAO; +import org.folio.kafka.KafkaConfig; import org.folio.rest.core.RestClient; import org.folio.service.InvoiceLineNumberService; +import org.folio.service.InvoiceLineStorageService; import org.folio.service.InvoiceStorageService; +import org.folio.service.audit.AuditEventProducer; +import org.folio.service.audit.AuditOutboxService; import org.folio.service.order.OrderStorageService; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; @Configuration +@Import(KafkaConfiguration.class) public class ApplicationConfig { @Bean @@ -34,12 +44,38 @@ public InvoiceLinesDAO invoiceLinesDAO() { } @Bean - public InvoiceStorageService invoiceStorageService(InvoiceDAO invoiceDAO) { - return new InvoiceStorageService(invoiceDAO); + public AuditOutboxEventLogDAO auditOutboxEventLogDAO() { + return new AuditOutboxEventLogPostgresDAO(); + } + + @Bean + public InternalLockDAO internalLockDAO() { + return new InternalLockPostgresDAO(); + } + + @Bean + public InvoiceStorageService invoiceStorageService(InvoiceDAO invoiceDAO, AuditOutboxService auditOutboxService) { + return new InvoiceStorageService(invoiceDAO, auditOutboxService); + } + + @Bean + public InvoiceLineStorageService invoiceLineStorageService(InvoiceLinesDAO invoiceLinesDAO, AuditOutboxService auditOutboxService) { + return new InvoiceLineStorageService(invoiceLinesDAO, auditOutboxService); } @Bean public InvoiceLineNumberService invoiceLineNumberService(InvoiceDAO invoiceDAO, InvoiceLinesDAO invoiceLinesDAO) { return new InvoiceLineNumberService(invoiceDAO, invoiceLinesDAO); } + + @Bean + public AuditEventProducer auditEventProducer(KafkaConfig kafkaConfig) { + return new AuditEventProducer(kafkaConfig); + } + + @Bean + public AuditOutboxService auditOutboxService(AuditOutboxEventLogDAO auditOutboxEventLogDAO, InternalLockDAO internalLockDAO, AuditEventProducer producer) { + return new AuditOutboxService(auditOutboxEventLogDAO, internalLockDAO, producer); + } + } diff --git a/src/main/java/org/folio/config/KafkaConfiguration.java b/src/main/java/org/folio/config/KafkaConfiguration.java new file mode 100644 index 00000000..65830b95 --- /dev/null +++ b/src/main/java/org/folio/config/KafkaConfiguration.java @@ -0,0 +1,36 @@ +package org.folio.config; + +import org.folio.kafka.KafkaConfig; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class KafkaConfiguration { + + @Value("${KAFKA_HOST:kafka}") + private String kafkaHost; + @Value("${KAFKA_PORT:9092}") + private String kafkaPort; + @Value("${OKAPI_URL:http://okapi:9130}") + private String okapiUrl; + @Value("${REPLICATION_FACTOR:1}") + private int replicationFactor; + @Value("${MAX_REQUEST_SIZE:1048576}") + private int maxRequestSize; + @Value("${ENV:folio}") + private String envId; + + @Bean + public KafkaConfig kafkaConfig() { + return KafkaConfig.builder() + .envId(envId) + .kafkaHost(kafkaHost) + .kafkaPort(kafkaPort) + .okapiUrl(okapiUrl) + .replicationFactor(replicationFactor) + .maxRequestSize(maxRequestSize) + .build(); + } + +} diff --git a/src/main/java/org/folio/dao/DbUtils.java b/src/main/java/org/folio/dao/DbUtils.java new file mode 100644 index 00000000..4cbb6df0 --- /dev/null +++ b/src/main/java/org/folio/dao/DbUtils.java @@ -0,0 +1,27 @@ +package org.folio.dao; + +import static javax.ws.rs.core.Response.Status.NOT_FOUND; +import static org.folio.rest.persist.PostgresClient.convertToPsqlStandard; + +import io.vertx.core.Future; +import io.vertx.ext.web.handler.HttpException; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; + +public class DbUtils { + + private static final String TABLE_NAME_TEMPLATE = "%s.%s"; + + public static String getTenantTableName(String tenantId, String tableName) { + return TABLE_NAME_TEMPLATE.formatted(convertToPsqlStandard(tenantId), tableName); + } + + public static Future verifyEntityUpdate(RowSet updated) { + return updated.rowCount() == 1 + ? Future.succeededFuture() + : Future.failedFuture(new HttpException(NOT_FOUND.getStatusCode(), NOT_FOUND.getReasonPhrase())); + } + + private DbUtils() {} + +} diff --git a/src/main/java/org/folio/dao/audit/AuditOutboxEventLogDAO.java b/src/main/java/org/folio/dao/audit/AuditOutboxEventLogDAO.java new file mode 100644 index 00000000..aff5c0f0 --- /dev/null +++ b/src/main/java/org/folio/dao/audit/AuditOutboxEventLogDAO.java @@ -0,0 +1,18 @@ +package org.folio.dao.audit; + +import java.util.List; + +import org.folio.rest.jaxrs.model.OutboxEventLog; +import org.folio.rest.persist.Conn; + +import io.vertx.core.Future; + +public interface AuditOutboxEventLogDAO { + + Future> getEventLogs(Conn conn, String tenantId); + + Future saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId); + + Future deleteEventLogs(Conn conn, List eventIds, String tenantId); + +} diff --git a/src/main/java/org/folio/dao/audit/AuditOutboxEventLogPostgresDAO.java b/src/main/java/org/folio/dao/audit/AuditOutboxEventLogPostgresDAO.java new file mode 100644 index 00000000..74bb734f --- /dev/null +++ b/src/main/java/org/folio/dao/audit/AuditOutboxEventLogPostgresDAO.java @@ -0,0 +1,84 @@ +package org.folio.dao.audit; + +import static org.folio.dao.DbUtils.getTenantTableName; + +import java.util.List; +import java.util.UUID; + +import org.folio.rest.jaxrs.model.OutboxEventLog; +import org.folio.rest.persist.Conn; +import org.folio.service.util.OutboxEventFields; + +import io.vertx.core.Future; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.Tuple; +import lombok.extern.log4j.Log4j2; +import one.util.streamex.StreamEx; + +@Log4j2 +public class AuditOutboxEventLogPostgresDAO implements AuditOutboxEventLogDAO { + + public static final String OUTBOX_TABLE_NAME = "outbox_event_log"; + private static final String SELECT_SQL = "SELECT * FROM %s LIMIT 1000"; + private static final String INSERT_SQL = "INSERT INTO %s (event_id, entity_type, action, payload) VALUES ($1, $2, $3, $4)"; + private static final String BATCH_DELETE_SQL = "DELETE from %s where event_id = ANY ($1)"; + + /** + * Get all event logs from outbox table. + * + * @param conn the sql connection from transaction + * @param tenantId the tenant id + * @return future of a list of fetched event logs + */ + public Future> getEventLogs(Conn conn, String tenantId) { + log.trace("getEventLogs:: Fetching event logs from outbox table for tenantId: '{}'", tenantId); + var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME); + return conn.execute(SELECT_SQL.formatted(tableName)) + .map(rows -> StreamEx.of(rows.iterator()).map(this::convertDbRowToOutboxEventLog).toList()) + .onFailure(t -> log.warn("getEventLogs:: Failed to fetch event logs for tenantId: '{}'", tenantId, t)); + } + + /** + * Saves event log to outbox table. + * + * @param conn the sql connection from transaction + * @param eventLog the event log to save + * @param tenantId the tenant id + * @return future of id of the inserted entity + */ + public Future saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId) { + log.debug("saveEventLog:: Saving event log to outbox table with eventId: '{}'", eventLog.getEventId()); + var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME); + Tuple params = Tuple.of(eventLog.getEventId(), eventLog.getEntityType().value(), eventLog.getAction(), eventLog.getPayload()); + return conn.execute(INSERT_SQL.formatted(tableName), params) + .onFailure(t -> log.warn("saveEventLog:: Failed to save event log with id: '{}'", eventLog.getEventId(), t)) + .mapEmpty(); + } + + /** + * Deletes outbox logs by event ids in batch. + * + * @param conn the sql connection from transaction + * @param eventIds the event ids to delete + * @param tenantId the tenant id + * @return future of row count for deleted records + */ + public Future deleteEventLogs(Conn conn, List eventIds, String tenantId) { + log.debug("deleteEventLogs:: Deleting outbox logs by event ids in batch: '{}'", eventIds); + var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME); + var param = eventIds.stream().map(UUID::fromString).toArray(UUID[]::new); + return conn.execute(BATCH_DELETE_SQL.formatted(tableName), Tuple.of(param)) + .map(SqlResult::rowCount) + .onFailure(t -> log.warn("deleteEventLogs: Failed to delete event logs by ids: '{}'", eventIds, t)); + } + + private OutboxEventLog convertDbRowToOutboxEventLog(Row row) { + return new OutboxEventLog() + .withEventId(row.getUUID(OutboxEventFields.EVENT_ID.getName()).toString()) + .withEntityType(OutboxEventLog.EntityType.fromValue(row.getString(OutboxEventFields.ENTITY_TYPE.getName()))) + .withAction(row.getString(OutboxEventFields.ACTION.getName())) + .withPayload(row.getString(OutboxEventFields.PAYLOAD.getName())); + } + +} diff --git a/src/main/java/org/folio/dao/invoice/InvoiceDAO.java b/src/main/java/org/folio/dao/invoice/InvoiceDAO.java index 8849753e..6c8238c3 100644 --- a/src/main/java/org/folio/dao/invoice/InvoiceDAO.java +++ b/src/main/java/org/folio/dao/invoice/InvoiceDAO.java @@ -10,8 +10,8 @@ public interface InvoiceDAO { Future getInvoiceByIdForUpdate(String invoiceId, Conn conn); - Future createInvoice(Invoice invoice, DBClient client); - Future updateInvoice(Invoice invoice, Conn conn); + Future createInvoice(Invoice invoice, Conn conn); + Future updateInvoice(String id, Invoice invoice, Conn conn); Future deleteInvoice(String id, DBClient client); Future deleteInvoiceLinesByInvoiceId(String id, DBClient client); Future deleteInvoiceDocumentsByInvoiceId(String id, DBClient client); diff --git a/src/main/java/org/folio/dao/invoice/InvoicePostgresDAO.java b/src/main/java/org/folio/dao/invoice/InvoicePostgresDAO.java index 683c6502..df134422 100644 --- a/src/main/java/org/folio/dao/invoice/InvoicePostgresDAO.java +++ b/src/main/java/org/folio/dao/invoice/InvoicePostgresDAO.java @@ -5,6 +5,7 @@ import static org.folio.rest.impl.InvoiceStorageImpl.INVOICE_ID_FIELD_NAME; import static org.folio.rest.impl.InvoiceStorageImpl.INVOICE_LINE_TABLE; import static org.folio.rest.impl.InvoiceStorageImpl.INVOICE_TABLE; +import static org.folio.rest.utils.ResponseUtils.convertPgExceptionIfNeeded; import static org.folio.rest.utils.ResponseUtils.handleFailure; import java.util.UUID; @@ -14,6 +15,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.folio.dao.DbUtils; import org.folio.dbschema.ObjectMapperTool; import org.folio.rest.jaxrs.model.Contents; import org.folio.rest.jaxrs.model.DocumentMetadata; @@ -49,32 +51,26 @@ public Future getInvoiceByIdForUpdate(String invoiceId, Conn conn) { } @Override - public Future createInvoice(Invoice invoice, DBClient client) { + public Future createInvoice(Invoice invoice, Conn conn) { log.info("Creating new invoice with id={}", invoice.getId()); - Promise promise = Promise.promise(); if (invoice.getId() == null) { invoice.setId(UUID.randomUUID().toString()); } if (invoice.getNextInvoiceLineNumber() == null) { invoice.setNextInvoiceLineNumber(1); } - client.getPgClient().save(client.getConnection(), INVOICE_TABLE, invoice.getId(), invoice, reply -> { - if (reply.failed()) { - String errorMessage = String.format("Invoice creation with id=%s failed", invoice.getId()); - log.error(errorMessage, reply.cause()); - handleFailure(promise, reply); - } else { - log.info("New invoice with id={} successfully created", invoice.getId()); - promise.complete(client); - } - }); - return promise.future(); + return conn.save(INVOICE_TABLE, invoice.getId(), invoice, true) + .recover(t -> Future.failedFuture(convertPgExceptionIfNeeded(t))) + .onSuccess(s -> log.info("createInvoice:: New invoice with id: '{}' successfully created", invoice.getId())) + .onFailure(t -> log.error("Failed to create invoice with id: '{}'", invoice.getId(), t)); } @Override - public Future updateInvoice(Invoice invoice, Conn conn) { - return conn.update(INVOICE_TABLE, invoice, invoice.getId()) - .onFailure(t -> log.error("updateInvoice failed for invoice with id {}", invoice.getId(), t)) + public Future updateInvoice(String id, Invoice invoice, Conn conn) { + return conn.update(INVOICE_TABLE, invoice, id) + .compose(DbUtils::verifyEntityUpdate) + .onSuccess(v -> log.info("updateInvoice:: Invoice with id: '{}' successfully updated", invoice.getId())) + .onFailure(t -> log.error("Update failed for invoice with id: '{}'", invoice.getId(), t)) .mapEmpty(); } @@ -185,7 +181,7 @@ public Future getInvoiceDocument(String invoiceId, String docum invoiceDocument.setMetadata(documentMetadata.getMetadata()); String base64Content = reply.result().iterator().next().getString(1); - if (StringUtils.isNotEmpty(base64Content)){ + if (StringUtils.isNotEmpty(base64Content)) { invoiceDocument.setContents(new Contents().withData(base64Content)); } log.info("Invoice document with invoiceId={} and documentId={} successfully retrieved", invoiceId, documentId); diff --git a/src/main/java/org/folio/dao/lines/InvoiceLinesDAO.java b/src/main/java/org/folio/dao/lines/InvoiceLinesDAO.java index a98fb951..11ddb022 100644 --- a/src/main/java/org/folio/dao/lines/InvoiceLinesDAO.java +++ b/src/main/java/org/folio/dao/lines/InvoiceLinesDAO.java @@ -8,5 +8,11 @@ import java.util.List; public interface InvoiceLinesDAO { + Future> getInvoiceLines(Criterion criterion, Conn conn); + + Future createInvoiceLine(InvoiceLine invoiceLine, Conn conn); + + Future updateInvoiceLine(String id, InvoiceLine invoiceLine, Conn conn); + } diff --git a/src/main/java/org/folio/dao/lines/InvoiceLinesPostgresDAO.java b/src/main/java/org/folio/dao/lines/InvoiceLinesPostgresDAO.java index 99f8f53c..8fc84aa6 100644 --- a/src/main/java/org/folio/dao/lines/InvoiceLinesPostgresDAO.java +++ b/src/main/java/org/folio/dao/lines/InvoiceLinesPostgresDAO.java @@ -3,25 +3,46 @@ import io.vertx.core.Future; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.folio.dao.DbUtils; import org.folio.rest.jaxrs.model.InvoiceLine; import org.folio.rest.persist.Conn; import org.folio.rest.persist.Criteria.Criterion; +import org.folio.rest.persist.interfaces.Results; import java.util.List; import static org.folio.rest.impl.InvoiceStorageImpl.INVOICE_LINE_TABLE; +import static org.folio.rest.utils.ResponseUtils.convertPgExceptionIfNeeded; public class InvoiceLinesPostgresDAO implements InvoiceLinesDAO { private final Logger log = LogManager.getLogger(); @Override public Future> getInvoiceLines(Criterion criterion, Conn conn) { - log.trace("InvoiceLinesPostgresDAO getInvoiceLines, criterion={}", criterion); + log.trace("getInvoiceLines:: Getting invoice lines with criterion: {}", criterion); return conn.get(INVOICE_LINE_TABLE, InvoiceLine.class, criterion, false) - .map(results -> { - log.trace("getInvoiceLines success, criterion={}", criterion); - return results.getResults(); - }) - .onFailure(t -> log.error("getInvoiceLines failed, criterion={}", criterion, t)); + .map(Results::getResults) + .onSuccess(lines -> log.trace("getInvoiceLines:: Got {} invoice lines with criterion: {}", lines.size(), criterion)) + .onFailure(t -> log.error("Failed to get invoice lines with criterion: {}", criterion, t)); } + + @Override + public Future createInvoiceLine(InvoiceLine invoiceLine, Conn conn) { + log.trace("createInvoiceLine:: Creating invoice line: {}", invoiceLine); + return conn.save(INVOICE_LINE_TABLE, invoiceLine.getId(), invoiceLine, true) + .recover(t -> Future.failedFuture(convertPgExceptionIfNeeded(t))) + .onSuccess(invoiceLineId -> log.info("createInvoiceLine:: Created invoice line with id: {}", invoiceLineId)) + .onFailure(t -> log.error("Failed to create invoice line with id: {}", invoiceLine.getId(), t)); + } + + @Override + public Future updateInvoiceLine(String id, InvoiceLine invoiceLine, Conn conn) { + log.trace("updateInvoiceLine:: Updating invoice line: {}", invoiceLine); + return conn.update(INVOICE_LINE_TABLE, invoiceLine, id) + .compose(DbUtils::verifyEntityUpdate) + .onSuccess(v -> log.info("updateInvoiceLine:: Updated invoice line with id: {}", id)) + .onFailure(t -> log.error("Failed to update invoice line with id: {}", id, t)) + .mapEmpty(); + } + } diff --git a/src/main/java/org/folio/dao/lock/InternalLockDAO.java b/src/main/java/org/folio/dao/lock/InternalLockDAO.java new file mode 100644 index 00000000..e1069686 --- /dev/null +++ b/src/main/java/org/folio/dao/lock/InternalLockDAO.java @@ -0,0 +1,11 @@ +package org.folio.dao.lock; + +import org.folio.rest.persist.Conn; + +import io.vertx.core.Future; + +public interface InternalLockDAO { + + Future selectWithLocking(Conn conn, String lockName, String tenantId); + +} diff --git a/src/main/java/org/folio/dao/lock/InternalLockPostgresDAO.java b/src/main/java/org/folio/dao/lock/InternalLockPostgresDAO.java new file mode 100644 index 00000000..170cf417 --- /dev/null +++ b/src/main/java/org/folio/dao/lock/InternalLockPostgresDAO.java @@ -0,0 +1,34 @@ +package org.folio.dao.lock; + +import static org.folio.dao.DbUtils.getTenantTableName; + +import org.folio.rest.persist.Conn; + +import io.vertx.core.Future; +import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.Tuple; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class InternalLockPostgresDAO implements InternalLockDAO { + + private static final String LOCK_TABLE_NAME = "internal_lock"; + private static final String SELECT_WITH_LOCKING = "SELECT * FROM %s WHERE lock_name = $1 FOR UPDATE"; + + /** + * Performs SELECT FOR UPDATE statement in order to implement locking. + * Lock released after the transaction is committed. + * + * @param conn connection with active transaction + * @param lockName the lock name + * @param tenantId the tenant id + * @return future with 1 row if lock was acquired + */ + public Future selectWithLocking(Conn conn, String lockName, String tenantId) { + log.debug("selectWithLocking:: Locking row with lockName: '{}'", lockName); + var tableName = getTenantTableName(tenantId, LOCK_TABLE_NAME); + return conn.execute(SELECT_WITH_LOCKING.formatted(tableName), Tuple.of(lockName)) + .map(SqlResult::rowCount) + .onFailure(t -> log.warn("selectWithLocking:: Unable to select row with lockName: '{}'", lockName, t)); + } +} diff --git a/src/main/java/org/folio/rest/impl/AuditOutboxAPI.java b/src/main/java/org/folio/rest/impl/AuditOutboxAPI.java new file mode 100644 index 00000000..5fc363a9 --- /dev/null +++ b/src/main/java/org/folio/rest/impl/AuditOutboxAPI.java @@ -0,0 +1,32 @@ +package org.folio.rest.impl; + +import javax.ws.rs.core.Response; +import java.util.Map; + +import org.folio.service.audit.AuditOutboxService; +import org.folio.rest.jaxrs.resource.InvoiceStorageAuditOutbox; +import org.folio.spring.SpringContextUtil; +import org.springframework.beans.factory.annotation.Autowired; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; + +public class AuditOutboxAPI implements InvoiceStorageAuditOutbox { + + @Autowired + private AuditOutboxService auditOutboxService; + + public AuditOutboxAPI() { + SpringContextUtil.autowireDependencies(this, Vertx.currentContext()); + } + + @Override + public void postInvoiceStorageAuditOutboxProcess(Map okapiHeaders, Handler> asyncResultHandler, Context vertxContext) { + auditOutboxService.processOutboxEventLogs(okapiHeaders, vertxContext) + .onSuccess(res -> asyncResultHandler.handle(Future.succeededFuture(Response.ok().build()))) + .onFailure(cause -> asyncResultHandler.handle(Future.failedFuture(cause))); + } +} diff --git a/src/main/java/org/folio/rest/impl/InvoiceStorageImpl.java b/src/main/java/org/folio/rest/impl/InvoiceStorageImpl.java index 2c9fee9b..1d50d6d8 100644 --- a/src/main/java/org/folio/rest/impl/InvoiceStorageImpl.java +++ b/src/main/java/org/folio/rest/impl/InvoiceStorageImpl.java @@ -12,6 +12,7 @@ import org.folio.rest.jaxrs.model.InvoiceLineCollection; import org.folio.rest.jaxrs.resource.InvoiceStorage; import org.folio.rest.persist.PgUtil; +import org.folio.service.InvoiceLineStorageService; import org.folio.service.InvoiceStorageService; import org.folio.spring.SpringContextUtil; import org.springframework.beans.factory.annotation.Autowired; @@ -28,10 +29,13 @@ public class InvoiceStorageImpl implements InvoiceStorage { public static final String INVOICE_ID_FIELD_NAME = "invoiceId"; public static final String INVOICE_LINE_TABLE = "invoice_lines"; public static final String INVOICE_PREFIX = "/invoice-storage/invoices/"; + public static final String INVOICE_LINES_PREFIX = "/invoice-storage/invoice-lines/"; public static final String INVOICE_TABLE = "invoices"; @Autowired private InvoiceStorageService invoiceStorageService; + @Autowired + private InvoiceLineStorageService invoiceLineStorageService; public InvoiceStorageImpl() { SpringContextUtil.autowireDependencies(this, Vertx.currentContext()); @@ -117,16 +121,14 @@ public void getInvoiceStorageInvoiceLines(String totalRecords, int offset, int l @Override public void postInvoiceStorageInvoiceLines(InvoiceLine entity, Map okapiHeaders, Handler> asyncResultHandler, Context vertxContext) { - PgUtil.post(INVOICE_LINE_TABLE, entity, okapiHeaders, vertxContext, PostInvoiceStorageInvoiceLinesResponse.class, - asyncResultHandler); + invoiceLineStorageService.createInvoiceLine(entity, asyncResultHandler, vertxContext, okapiHeaders); } @Validate @Override public void putInvoiceStorageInvoiceLinesById(String id, InvoiceLine entity, Map okapiHeaders, Handler> asyncResultHandler, Context vertxContext) { - PgUtil.put(INVOICE_LINE_TABLE, entity, id, okapiHeaders, vertxContext, PutInvoiceStorageInvoiceLinesByIdResponse.class, - asyncResultHandler); + invoiceLineStorageService.updateInvoiceLine(id, entity, okapiHeaders, asyncResultHandler, vertxContext); } @Validate diff --git a/src/main/java/org/folio/rest/utils/ResponseUtils.java b/src/main/java/org/folio/rest/utils/ResponseUtils.java index 2217d327..a59b97e7 100644 --- a/src/main/java/org/folio/rest/utils/ResponseUtils.java +++ b/src/main/java/org/folio/rest/utils/ResponseUtils.java @@ -4,6 +4,7 @@ import static javax.ws.rs.core.HttpHeaders.LOCATION; import static javax.ws.rs.core.MediaType.APPLICATION_JSON; import static javax.ws.rs.core.MediaType.TEXT_PLAIN; +import static javax.ws.rs.core.Response.Status.BAD_REQUEST; import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import java.net.URI; @@ -46,12 +47,15 @@ public static Handler>> handleNoContentResponse(Handler promise, AsyncResult reply) { - Throwable cause = reply.cause(); - String badRequestMessage = PgExceptionUtil.badRequestMessage(cause); + promise.fail(convertPgExceptionIfNeeded(reply.cause())); + } + + public static Throwable convertPgExceptionIfNeeded(Throwable cause) { + var badRequestMessage = PgExceptionUtil.badRequestMessage(cause); if (badRequestMessage != null) { - promise.fail(new HttpException(Response.Status.BAD_REQUEST.getStatusCode(), badRequestMessage)); + return new HttpException(BAD_REQUEST.getStatusCode(), badRequestMessage); } else { - promise.fail(new HttpException(INTERNAL_SERVER_ERROR.getStatusCode(), cause.getMessage())); + return new HttpException(INTERNAL_SERVER_ERROR.getStatusCode(), cause.getMessage()); } } @@ -82,6 +86,10 @@ public static Future buildOkResponse(Object body) { return Future.succeededFuture(Response.ok(body, APPLICATION_JSON).build()); } + public static Future buildBadRequestResponse(String body) { + return Future.succeededFuture(buildErrorResponse(BAD_REQUEST.getStatusCode(), body)); + } + public static Future buildErrorResponse(Throwable throwable) { final String message; final int code; diff --git a/src/main/java/org/folio/service/InvoiceLineNumberService.java b/src/main/java/org/folio/service/InvoiceLineNumberService.java index 61176c9b..eb5e4fd9 100644 --- a/src/main/java/org/folio/service/InvoiceLineNumberService.java +++ b/src/main/java/org/folio/service/InvoiceLineNumberService.java @@ -69,7 +69,7 @@ public Future retrieveNewLineNumber(String invoiceId, DBClien log.debug("Updating invoice {} with new nextInvoiceLineNumber", invoiceId); int nextNumber = invoice.getNextInvoiceLineNumber(); invoice.setNextInvoiceLineNumber(nextNumber + 1); - return invoiceDAO.updateInvoice(invoice, conn) + return invoiceDAO.updateInvoice(invoiceId, invoice, conn) .map(v -> nextNumber); }) .map(n -> { diff --git a/src/main/java/org/folio/service/InvoiceLineStorageService.java b/src/main/java/org/folio/service/InvoiceLineStorageService.java new file mode 100644 index 00000000..255091bc --- /dev/null +++ b/src/main/java/org/folio/service/InvoiceLineStorageService.java @@ -0,0 +1,71 @@ +package org.folio.service; + +import static org.folio.rest.impl.InvoiceStorageImpl.INVOICE_LINES_PREFIX; +import static org.folio.rest.utils.ResponseUtils.buildBadRequestResponse; +import static org.folio.rest.utils.ResponseUtils.buildErrorResponse; +import static org.folio.rest.utils.ResponseUtils.buildNoContentResponse; +import static org.folio.rest.utils.ResponseUtils.buildResponseWithLocation; +import static org.folio.rest.utils.RestConstants.OKAPI_URL; + +import javax.ws.rs.core.Response; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.folio.dao.lines.InvoiceLinesDAO; +import org.folio.rest.jaxrs.model.InvoiceLine; +import org.folio.rest.jaxrs.model.InvoiceLineAuditEvent; +import org.folio.rest.persist.DBClient; +import org.folio.service.audit.AuditOutboxService; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +@RequiredArgsConstructor +public class InvoiceLineStorageService { + + private final InvoiceLinesDAO invoiceLinesDAO; + private final AuditOutboxService auditOutboxService; + + public void createInvoiceLine(InvoiceLine invoiceLine, Handler> asyncResultHandler, + Context vertxContext, Map headers) { + log.info("createInvoiceLine:: Creating a new invoiceLine by id: {}", invoiceLine.getId()); + new DBClient(vertxContext, headers).getPgClient() + .withTrans(conn -> invoiceLinesDAO.createInvoiceLine(invoiceLine, conn) + .map(invoiceLine::withId) + .compose(invoiceLineId -> auditOutboxService.saveInvoiceLineOutboxLog(conn, invoiceLine, InvoiceLineAuditEvent.Action.CREATE, headers))) + .onSuccess(s -> { + log.info("createInvoiceLine:: Successfully created a new invoiceLine by id: {}", invoiceLine.getId()); + auditOutboxService.processOutboxEventLogs(headers, vertxContext); + asyncResultHandler.handle(buildResponseWithLocation(headers.get(OKAPI_URL), INVOICE_LINES_PREFIX + invoiceLine.getId(), invoiceLine)); + }) + .onFailure(f -> { + log.error("Error occurred while creating a new invoiceLine with id: {}", invoiceLine.getId(), f); + asyncResultHandler.handle(buildErrorResponse(f)); + }); + } + + public void updateInvoiceLine(String id, InvoiceLine invoiceLine, Map headers, + Handler> asyncResultHandler, Context vertxContext) { + log.info("updateInvoiceLine:: Updating invoice line with id: {}", id); + if (StringUtils.isBlank(id)) { + asyncResultHandler.handle(buildBadRequestResponse("Invoice line id is required")); + } + new DBClient(vertxContext, headers).getPgClient() + .withTrans(conn -> invoiceLinesDAO.updateInvoiceLine(id, invoiceLine, conn) + .compose(invoiceLineId -> auditOutboxService.saveInvoiceLineOutboxLog(conn, invoiceLine, InvoiceLineAuditEvent.Action.EDIT, headers))) + .onSuccess(s -> { + log.info("updateInvoiceLine:: Successfully updated invoice line with id: {}", id); + auditOutboxService.processOutboxEventLogs(headers, vertxContext); + asyncResultHandler.handle(buildNoContentResponse()); + }) + .onFailure(f -> { + log.error("Error occurred while updating invoice line with id: {}", id, f); + asyncResultHandler.handle(buildErrorResponse(f)); + }); + } + +} diff --git a/src/main/java/org/folio/service/InvoiceStorageService.java b/src/main/java/org/folio/service/InvoiceStorageService.java index 4daae9c6..10702516 100644 --- a/src/main/java/org/folio/service/InvoiceStorageService.java +++ b/src/main/java/org/folio/service/InvoiceStorageService.java @@ -4,8 +4,8 @@ import static org.folio.rest.impl.InvoiceStorageImpl.DOCUMENT_TABLE; import static org.folio.rest.impl.InvoiceStorageImpl.INVOICE_ID_FIELD_NAME; import static org.folio.rest.impl.InvoiceStorageImpl.INVOICE_PREFIX; -import static org.folio.rest.impl.InvoiceStorageImpl.INVOICE_TABLE; import static org.folio.rest.utils.HelperUtils.combineCqlExpressions; +import static org.folio.rest.utils.ResponseUtils.buildBadRequestResponse; import static org.folio.rest.utils.ResponseUtils.buildOkResponse; import static org.folio.rest.utils.ResponseUtils.buildErrorResponse; import static org.folio.rest.utils.ResponseUtils.buildNoContentResponse; @@ -17,64 +17,68 @@ import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.folio.dao.invoice.InvoiceDAO; import org.folio.rest.jaxrs.model.Document; import org.folio.rest.jaxrs.model.DocumentCollection; import org.folio.rest.jaxrs.model.Invoice; +import org.folio.rest.jaxrs.model.InvoiceAuditEvent; import org.folio.rest.jaxrs.model.InvoiceDocument; import org.folio.rest.jaxrs.resource.InvoiceStorage.GetInvoiceStorageInvoicesDocumentsByIdResponse; -import org.folio.rest.jaxrs.resource.InvoiceStorage.PutInvoiceStorageInvoicesByIdResponse; import org.folio.rest.persist.DBClient; import org.folio.rest.persist.PgUtil; +import org.folio.service.audit.AuditOutboxService; import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Handler; import io.vertx.ext.web.handler.HttpException; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +@Log4j2 +@RequiredArgsConstructor public class InvoiceStorageService { - private static final Logger log = LogManager.getLogger(InvoiceStorageService.class); private static final String INVOICE_ID_MISMATCH_ERROR_MESSAGE = "Invoice id mismatch"; private final InvoiceDAO invoiceDAO; - - public InvoiceStorageService(InvoiceDAO invoiceDAO) { - this.invoiceDAO = invoiceDAO; - } + private final AuditOutboxService auditOutboxService; public void postInvoiceStorageInvoices(Invoice invoice, Handler> asyncResultHandler, - Context vertxContext, Map headers) { - try { - vertxContext.runOnContext(v -> { - log.info("postInvoiceStorageInvoices:: Creating a new invoice by id: {}", invoice.getId()); - - DBClient client = new DBClient(vertxContext, headers); - client.startTx() - .compose(t -> invoiceDAO.createInvoice(invoice, client)) - .compose(t -> client.endTx()) - .onComplete(reply -> { - if (reply.failed()) { - // The result of rollback operation is not so important, main failure cause is used to build the response - client.rollbackTransaction().onComplete(res -> asyncResultHandler.handle(buildErrorResponse( - reply.cause()))); - } else { - log.info("postInvoiceStorageInvoices:: Preparing response to client"); - asyncResultHandler.handle( - buildResponseWithLocation(headers.get(OKAPI_URL), INVOICE_PREFIX + invoice.getId(), invoice) - ); - } - }); + Context vertxContext, Map headers) { + log.info("postInvoiceStorageInvoices:: Creating a new invoice by id: {}", invoice.getId()); + new DBClient(vertxContext, headers).getPgClient() + .withTrans(conn -> invoiceDAO.createInvoice(invoice, conn) + .compose(invoiceId -> auditOutboxService.saveInvoiceOutboxLog(conn, invoice, InvoiceAuditEvent.Action.CREATE, headers))) + .onSuccess(s -> { + log.info("postInvoiceStorageInvoices:: Successfully created a new invoice by id: {}", invoice.getId()); + auditOutboxService.processOutboxEventLogs(headers, vertxContext); + asyncResultHandler.handle(buildResponseWithLocation(headers.get(OKAPI_URL), INVOICE_PREFIX + invoice.getId(), invoice)); + }) + .onFailure(f -> { + log.error("Error occurred while creating a new invoice with id: {}", invoice.getId(), f); + asyncResultHandler.handle(buildErrorResponse(f)); }); - } catch (Exception e) { - log.error("Error occurred while creating a new invoice with id: {}", invoice.getId(), e); - asyncResultHandler.handle(buildErrorResponse( - new HttpException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), - Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase()) - )); + } + + public void putInvoiceStorageInvoicesById(String id, Invoice invoice, Map headers, + Handler> asyncResultHandler, Context vertxContext) { + log.info("putInvoiceStorageInvoicesById:: Updating invoice with id: {}", id); + if (StringUtils.isBlank(id)) { + asyncResultHandler.handle(buildBadRequestResponse("Invoice id is required")); } + new DBClient(vertxContext, headers).getPgClient() + .withTrans(conn -> invoiceDAO.updateInvoice(id, invoice, conn) + .compose(invoiceId -> auditOutboxService.saveInvoiceOutboxLog(conn, invoice, InvoiceAuditEvent.Action.EDIT, headers))) + .onSuccess(s -> { + log.info("putInvoiceStorageInvoicesById:: Successfully updated invoice with id: {}", id); + auditOutboxService.processOutboxEventLogs(headers, vertxContext); + asyncResultHandler.handle(buildNoContentResponse()); + }) + .onFailure(f -> { + log.error("Error occurred while updating invoice with id: {}", id, f); + asyncResultHandler.handle(buildErrorResponse(f)); + }); } public void deleteInvoiceStorageInvoicesById(String id, Handler> asyncResultHandler, @@ -108,20 +112,6 @@ public void deleteInvoiceStorageInvoicesById(String id, Handler okapiHeaders, - Handler> asyncResultHandler, Context vertxContext) { - log.debug("putInvoiceStorageInvoicesById:: Updating invoice with id: {}", id); - PgUtil.put(INVOICE_TABLE, invoice, id, okapiHeaders, vertxContext, - PutInvoiceStorageInvoicesByIdResponse.class, reply -> { - if (reply.succeeded()) { - log.info("putInvoiceStorageInvoicesById:: Invoice with id: {} was successfully updated", id); - } else { - log.error("Error occurred while updating invoice with id: {}", id, reply.cause()); - } - asyncResultHandler.handle(reply); - }); - } - public void getInvoiceStorageInvoicesDocumentsById(String id, int offset, int limit, String query, Map okapiHeaders, Handler> asyncResultHandler, Context vertxContext) { log.debug("getInvoiceStorageInvoicesDocumentsById:: Getting invoice documents by invoice id: {}", id); diff --git a/src/main/java/org/folio/service/audit/AuditEventProducer.java b/src/main/java/org/folio/service/audit/AuditEventProducer.java new file mode 100644 index 00000000..ce068d84 --- /dev/null +++ b/src/main/java/org/folio/service/audit/AuditEventProducer.java @@ -0,0 +1,109 @@ +package org.folio.service.audit; + +import java.util.Date; +import java.util.Map; +import java.util.UUID; + +import org.folio.kafka.KafkaConfig; +import org.folio.kafka.KafkaTopicNameHelper; +import org.folio.kafka.SimpleKafkaProducerManager; +import org.folio.kafka.services.KafkaProducerRecordBuilder; +import org.folio.rest.jaxrs.model.EventTopic; +import org.folio.rest.jaxrs.model.Invoice; +import org.folio.rest.jaxrs.model.InvoiceAuditEvent; +import org.folio.rest.jaxrs.model.InvoiceLineAuditEvent; +import org.folio.rest.jaxrs.model.InvoiceLine; +import org.folio.rest.tools.utils.TenantTool; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +@RequiredArgsConstructor +public class AuditEventProducer { + + private final KafkaConfig kafkaConfig; + + /** + * Sends event for invoice change(Create, Edit) to kafka. + * InvoiceId is used as partition key to send all events for particular invoice to the same partition. + * + * @param invoice the event payload + * @param eventAction the event action + * @param okapiHeaders the okapi headers + * @return future with true if sending was success or failed future in another case + */ + public Future sendInvoiceEvent(Invoice invoice, InvoiceAuditEvent.Action eventAction, Map okapiHeaders) { + var event = getAuditEvent(invoice, eventAction); + log.info("sendInvoiceEvent:: Sending event with id: {} and invoiceId: {} to Kafka", event.getId(), invoice.getId()); + return sendToKafka(EventTopic.ACQ_INVOICE_CHANGED, event.getInvoiceId(), event, okapiHeaders) + .onFailure(t -> log.warn("sendInvoiceEvent:: Failed to send event with id: {} and invoiceId: {} to Kafka", event.getId(), invoice.getId(), t)); + } + + /** + * Sends event for invoice line change(Create, Edit) to kafka. + * InvoiceLineId is used as partition key to send all events for particular invoice line to the same partition. + * + * @param invoiceLine the event payload + * @param eventAction the event action + * @param okapiHeaders the okapi headers + * @return future with true if sending was success or failed future in another case + */ + public Future sendInvoiceLineEvent(InvoiceLine invoiceLine, InvoiceLineAuditEvent.Action eventAction, Map okapiHeaders) { + var event = getAuditEvent(invoiceLine, eventAction); + log.info("sendInvoiceLineEvent:: Sending event with id: {} and invoiceLineId: {} to Kafka", event.getId(), invoiceLine.getId()); + return sendToKafka(EventTopic.ACQ_INVOICE_LINE_CHANGED, event.getInvoiceLineId(), event, okapiHeaders) + .onFailure(t -> log.error("sendInvoiceLineEvent:: Failed to send event with id: {} and invoiceLineId: {} to Kafka", event.getId(), invoiceLine.getId(), t)); + } + + private InvoiceAuditEvent getAuditEvent(Invoice invoice, InvoiceAuditEvent.Action eventAction) { + return new InvoiceAuditEvent() + .withId(UUID.randomUUID().toString()) + .withAction(eventAction) + .withInvoiceId(invoice.getId()) + .withEventDate(new Date()) + .withActionDate(invoice.getMetadata().getUpdatedDate()) + .withUserId(invoice.getMetadata().getUpdatedByUserId()) + .withInvoiceSnapshot(invoice.withMetadata(null)); + } + + private InvoiceLineAuditEvent getAuditEvent(InvoiceLine invoiceLine, InvoiceLineAuditEvent.Action eventAction) { + return new InvoiceLineAuditEvent() + .withId(UUID.randomUUID().toString()) + .withAction(eventAction) + .withInvoiceId(invoiceLine.getInvoiceId()) + .withInvoiceLineId(invoiceLine.getId()) + .withEventDate(new Date()) + .withActionDate(invoiceLine.getMetadata().getUpdatedDate()) + .withUserId(invoiceLine.getMetadata().getUpdatedByUserId()) + .withInvoiceLineSnapshot(invoiceLine.withMetadata(null)); + } + + private Future sendToKafka(EventTopic eventTopic, String key, Object eventPayload, Map okapiHeaders) { + var tenantId = TenantTool.tenantId(okapiHeaders); + var topicName = buildTopicName(kafkaConfig.getEnvId(), tenantId, eventTopic.value()); + KafkaProducerRecord kafkaProducerRecord = new KafkaProducerRecordBuilder(tenantId) + .key(key) + .value(eventPayload) + .topic(topicName) + .propagateOkapiHeaders(okapiHeaders) + .build(); + + var producerManager = new SimpleKafkaProducerManager(Vertx.currentContext().owner(), kafkaConfig); + KafkaProducer producer = producerManager.createShared(topicName); + return producer.send(kafkaProducerRecord) + .onSuccess(s -> log.info("sendToKafka:: Event for {} with id '{}' has been sent to kafka topic '{}'", eventTopic, key, topicName)) + .onFailure(t -> log.error("Failed to send event for {} with id '{}' to kafka topic '{}'", eventTopic, key, topicName, t)) + .onComplete(reply -> producer.end(v -> producer.close())) + .mapEmpty(); + } + + private String buildTopicName(String envId, String tenantId, String eventType) { + return KafkaTopicNameHelper.formatTopicName(envId, KafkaTopicNameHelper.getDefaultNameSpace(), tenantId, eventType); + } + +} diff --git a/src/main/java/org/folio/service/audit/AuditOutboxService.java b/src/main/java/org/folio/service/audit/AuditOutboxService.java new file mode 100644 index 00000000..dde6207e --- /dev/null +++ b/src/main/java/org/folio/service/audit/AuditOutboxService.java @@ -0,0 +1,119 @@ +package org.folio.service.audit; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.commons.collections4.CollectionUtils; +import org.folio.dao.audit.AuditOutboxEventLogDAO; +import org.folio.dao.lock.InternalLockDAO; +import org.folio.okapi.common.GenericCompositeFuture; +import org.folio.rest.jaxrs.model.Invoice; +import org.folio.rest.jaxrs.model.InvoiceAuditEvent; +import org.folio.rest.jaxrs.model.InvoiceLine; +import org.folio.rest.jaxrs.model.InvoiceLineAuditEvent; +import org.folio.rest.jaxrs.model.OutboxEventLog; +import org.folio.rest.jaxrs.model.OutboxEventLog.EntityType; +import org.folio.rest.persist.Conn; +import org.folio.rest.persist.DBClient; +import org.folio.rest.tools.utils.TenantTool; + +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.json.Json; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +@RequiredArgsConstructor +public class AuditOutboxService { + + private static final String OUTBOX_LOCK_NAME = "audit_outbox"; + + private final AuditOutboxEventLogDAO outboxEventLogDAO; + private final InternalLockDAO internalLockDAO; + private final AuditEventProducer producer; + + /** + * Reads outbox event logs from DB and send them to Kafka and delete from outbox table in a single transaction. + * + * @param okapiHeaders the okapi headers + * @param vertxContext the vertx context + * @return future with integer how many records have been processed + */ + public Future processOutboxEventLogs(Map okapiHeaders, Context vertxContext) { + var tenantId = TenantTool.tenantId(okapiHeaders); + return new DBClient(vertxContext, okapiHeaders).getPgClient() + .withTrans(conn -> internalLockDAO.selectWithLocking(conn, OUTBOX_LOCK_NAME, tenantId) + .compose(retrievedCount -> outboxEventLogDAO.getEventLogs(conn, tenantId)) + .compose(logs -> { + if (CollectionUtils.isEmpty(logs)) { + log.info("processOutboxEventLogs: No logs found in outbox table"); + return Future.succeededFuture(0); + } + log.info("processOutboxEventLogs: {} logs found in outbox table, sending to kafka", logs.size()); + return GenericCompositeFuture.join(sendEventLogsToKafka(logs, okapiHeaders)) + .map(logs.stream().map(OutboxEventLog::getEventId).toList()) + .compose(eventIds -> outboxEventLogDAO.deleteEventLogs(conn, eventIds, tenantId)) + .onSuccess(count -> log.info("processOutboxEventLogs:: {} logs have been deleted from outbox table", count)) + .onFailure(ex -> log.error("Logs deletion failed", ex)); + }) + .onSuccess(count -> log.info("processOutboxEventLogs:: Successfully processed outbox event logs: {}", count)) + .onFailure(ex -> log.error("Failed to process outbox event logs", ex))); + } + + private List> sendEventLogsToKafka(List eventLogs, Map okapiHeaders) { + return eventLogs.stream().map(eventLog -> + switch (eventLog.getEntityType()) { + case INVOICE -> { + var invoice = Json.decodeValue(eventLog.getPayload(), Invoice.class); + var action = InvoiceAuditEvent.Action.fromValue(eventLog.getAction()); + yield producer.sendInvoiceEvent(invoice, action, okapiHeaders); + } + case INVOICE_LINE -> { + var invoiceLine = Json.decodeValue(eventLog.getPayload(), InvoiceLine.class); + var action = InvoiceLineAuditEvent.Action.fromValue(eventLog.getAction()); + yield producer.sendInvoiceLineEvent(invoiceLine, action, okapiHeaders); + } + }).toList(); + } + + /** + * Saves invoice outbox log. + * + * @param conn connection in transaction + * @param entity the invoice + * @param action the event action + * @param okapiHeaders okapi headers + * @return future with saved outbox log id in the same transaction + */ + public Future saveInvoiceOutboxLog(Conn conn, Invoice entity, InvoiceAuditEvent.Action action, Map okapiHeaders) { + return saveOutboxLog(conn, okapiHeaders, action.value(), EntityType.INVOICE, entity.getId(), entity); + } + + /** + * Saves invoice line outbox log. + * + * @param conn connection in transaction + * @param entity the invoice line + * @param action the event action + * @param okapiHeaders okapi headers + * @return future with saved outbox log id in the same transaction + */ + public Future saveInvoiceLineOutboxLog(Conn conn, InvoiceLine entity, InvoiceLineAuditEvent.Action action, Map okapiHeaders) { + return saveOutboxLog(conn, okapiHeaders, action.value(), EntityType.INVOICE_LINE, entity.getId(), entity); + } + + private Future saveOutboxLog(Conn conn, Map okapiHeaders, String action, EntityType entityType, String entityId, Object entity) { + log.debug("saveOutboxLog:: Saving outbox log for {} with id: {}", entityType, entityId); + var eventLog = new OutboxEventLog() + .withEventId(UUID.randomUUID().toString()) + .withAction(action) + .withEntityType(entityType) + .withPayload(Json.encode(entity)); + return outboxEventLogDAO.saveEventLog(conn, eventLog, TenantTool.tenantId(okapiHeaders)) + .onSuccess(reply -> log.info("saveOutboxLog:: Outbox log has been saved for {} with id: {}", entityType, entityId)) + .onFailure(e -> log.warn("saveOutboxLog:: Could not save outbox audit log for {} with id: {}", entityType, entityId, e)); + } + +} diff --git a/src/main/java/org/folio/service/util/OutboxEventFields.java b/src/main/java/org/folio/service/util/OutboxEventFields.java new file mode 100644 index 00000000..ad7ce7d0 --- /dev/null +++ b/src/main/java/org/folio/service/util/OutboxEventFields.java @@ -0,0 +1,17 @@ +package org.folio.service.util; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +public enum OutboxEventFields { + + EVENT_ID("event_id"), + ENTITY_TYPE("entity_type"), + ACTION("action"), + PAYLOAD("payload"); + + private final String name; + +} diff --git a/src/main/resources/templates/db_scripts/schema.json b/src/main/resources/templates/db_scripts/schema.json index a7c9dd9d..9a95be83 100644 --- a/src/main/resources/templates/db_scripts/schema.json +++ b/src/main/resources/templates/db_scripts/schema.json @@ -84,6 +84,16 @@ "run": "after", "snippetPath": "invoices_table.sql", "fromModuleVersion": "mod-invoice-storage-5.9.0" + }, + { + "run": "after", + "snippetPath": "tables/create_audit_outbox_table.sql", + "fromModuleVersion": "mod-invoice-storage-6.0.0" + }, + { + "run": "after", + "snippetPath": "tables/create_internal_lock_table.sql", + "fromModuleVersion": "mod-invoice-storage-6.0.0" } ], "tables": [ diff --git a/src/main/resources/templates/db_scripts/tables/create_audit_outbox_table.sql b/src/main/resources/templates/db_scripts/tables/create_audit_outbox_table.sql new file mode 100644 index 00000000..ff31c93e --- /dev/null +++ b/src/main/resources/templates/db_scripts/tables/create_audit_outbox_table.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS outbox_event_log ( + event_id uuid NOT NULL PRIMARY KEY, + entity_type text NOT NULL, + action text NOT NULL, + payload jsonb +); diff --git a/src/main/resources/templates/db_scripts/tables/create_internal_lock_table.sql b/src/main/resources/templates/db_scripts/tables/create_internal_lock_table.sql new file mode 100644 index 00000000..a441d415 --- /dev/null +++ b/src/main/resources/templates/db_scripts/tables/create_internal_lock_table.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS internal_lock ( + lock_name text NOT NULL PRIMARY KEY +); + +INSERT INTO internal_lock(lock_name) VALUES ('audit_outbox') ON CONFLICT DO NOTHING; diff --git a/src/test/java/org/folio/rest/impl/AuditOutboxAPITest.java b/src/test/java/org/folio/rest/impl/AuditOutboxAPITest.java new file mode 100644 index 00000000..04a0fb0b --- /dev/null +++ b/src/test/java/org/folio/rest/impl/AuditOutboxAPITest.java @@ -0,0 +1,27 @@ +package org.folio.rest.impl; + +import static io.restassured.RestAssured.given; +import static org.folio.rest.impl.StorageTestSuite.storageUrl; + +import java.net.MalformedURLException; + +import org.junit.jupiter.api.Test; + +public class AuditOutboxAPITest extends TestBase { + + public static final String AUDIT_OUTBOX_ENDPOINT = "/invoice-storage/audit-outbox/process"; + + @Test + void testPostInvoiceStorageAuditOutboxProcess() throws MalformedURLException { + given() + .spec(commonRequestSpec()) + .when() + .post(storageUrl(AUDIT_OUTBOX_ENDPOINT)) + .then() + .assertThat() + .statusCode(200) + .extract() + .response(); + } + +} diff --git a/src/test/java/org/folio/rest/impl/EntitiesCrudTest.java b/src/test/java/org/folio/rest/impl/EntitiesCrudTest.java index e2abc1b2..674b7104 100644 --- a/src/test/java/org/folio/rest/impl/EntitiesCrudTest.java +++ b/src/test/java/org/folio/rest/impl/EntitiesCrudTest.java @@ -20,6 +20,10 @@ class EntitiesCrudTest extends TestBase { private static final Logger log = LogManager.getLogger(EntitiesCrudTest.class); + + private static final String CREATE_EVENT = "CREATE"; + private static final String UPDATE_EVENT = "EDIT"; + private String sample = null; static Stream deleteOrder() { @@ -83,7 +87,7 @@ void testPostData(TestEntities testEntity) throws MalformedURLException { void testVerifyCollectionQuantity(TestEntities testEntity) throws MalformedURLException { log.info(String.format("--- mod-invoice-storage %s test: Verifying only 1 adjustment was created ... ", testEntity.name())); verifyCollectionQuantity(testEntity.getEndpoint(), TestEntities.BATCH_GROUP.equals(testEntity)? 2 : 1); - + verifyKafkaMessagesSentIfNeeded(CREATE_EVENT, testEntity, TENANT_HEADER.getValue(), USER_ID_HEADER.getValue(), 1); } @ParameterizedTest @@ -119,6 +123,7 @@ void testVerifyPut(TestEntities testEntity) throws MalformedURLException { log.info(String.format("--- mod-invoice-storage %s test: Fetching updated %s with ID: %s", testEntity.name(), testEntity.name(), testEntity.getId())); testFetchingUpdatedEntity(testEntity.getId(), testEntity); + verifyKafkaMessagesSentIfNeeded(UPDATE_EVENT, testEntity, TENANT_HEADER.getValue(), USER_ID_HEADER.getValue(), 1); } @ParameterizedTest diff --git a/src/test/java/org/folio/rest/impl/StorageTestSuite.java b/src/test/java/org/folio/rest/impl/StorageTestSuite.java index dc79c2b5..d9165e9b 100644 --- a/src/test/java/org/folio/rest/impl/StorageTestSuite.java +++ b/src/test/java/org/folio/rest/impl/StorageTestSuite.java @@ -1,13 +1,15 @@ package org.folio.rest.impl; +import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig; +import static org.folio.kafka.KafkaTopicNameHelper.getDefaultNameSpace; import static org.folio.rest.impl.TestBase.TENANT_HEADER; import static org.folio.rest.utils.TenantApiTestUtil.deleteTenant; import static org.folio.rest.utils.TenantApiTestUtil.prepareTenant; -import java.io.IOException; import java.lang.reflect.Field; import java.net.MalformedURLException; import java.net.URL; +import java.util.List; import java.util.Locale; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -17,6 +19,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.folio.kafka.KafkaTopicNameHelper; import org.folio.postgres.testing.PostgresTesterContainer; import org.folio.rest.RestVerticle; import org.folio.rest.jaxrs.model.TenantJob; @@ -36,6 +39,9 @@ import io.vertx.core.Vertx; import io.vertx.core.impl.VertxImpl; import io.vertx.core.json.JsonObject; +import lombok.SneakyThrows; +import net.mguenther.kafka.junit.EmbeddedKafkaCluster; +import net.mguenther.kafka.junit.ObserveKeyValues; public class StorageTestSuite { private static final Logger log = LogManager.getLogger(StorageTestSuite.class); @@ -45,6 +51,15 @@ public class StorageTestSuite { public static final Header URL_TO_HEADER = new Header("X-Okapi-Url-to", "http://localhost:" + port); private static TenantJob tenantJob; + public static EmbeddedKafkaCluster KAFKA_CLUSTER; + public static final String KAFKA_ENV_VALUE = "test-env"; + private static final String KAFKA_HOST = "KAFKA_HOST"; + private static final String KAFKA_PORT = "KAFKA_PORT"; + private static final String KAFKA_ENV = "ENV"; + private static final String OKAPI_URL_KEY = "OKAPI_URL"; + public static final int MOCK_KAFKA_PORT = NetworkUtils.nextFreePort(); + + private StorageTestSuite() {} public static URL storageUrl(String path) throws MalformedURLException { @@ -83,21 +98,28 @@ private static Context getContextWithReflection(Verticle verticle) { } @BeforeAll - public static void before() throws IOException, InterruptedException, ExecutionException, TimeoutException { + public static void before() throws InterruptedException, ExecutionException, TimeoutException { // tests expect English error messages only, no Danish/German/... Locale.setDefault(Locale.US); vertx = Vertx.vertx(); - log.info("Start container database"); - + log.info("Starting kafka cluster"); + KAFKA_CLUSTER = EmbeddedKafkaCluster.provisionWith(defaultClusterConfig()); + KAFKA_CLUSTER.start(); + String[] hostAndPort = KAFKA_CLUSTER.getBrokerList().split(":"); + System.setProperty(KAFKA_HOST, hostAndPort[0]); + System.setProperty(KAFKA_PORT, hostAndPort[1]); + System.setProperty(KAFKA_ENV, KAFKA_ENV_VALUE); + System.setProperty(OKAPI_URL_KEY, "http://localhost:" + MOCK_KAFKA_PORT); + log.info("Kafka cluster started with broker list: {}", KAFKA_CLUSTER.getBrokerList()); + + log.info("Starting container database"); PostgresClient.setPostgresTester(new PostgresTesterContainer()); DeploymentOptions options = new DeploymentOptions(); - options.setConfig(new JsonObject().put("http.port", port)); options.setWorker(true); - startVerticle(options); tenantJob = prepareTenant(TENANT_HEADER, false, false); @@ -106,6 +128,7 @@ public static void before() throws IOException, InterruptedException, ExecutionE @AfterAll public static void after() throws InterruptedException, ExecutionException, TimeoutException { log.info("Delete tenant"); + KAFKA_CLUSTER.stop(); deleteTenant(tenantJob, TENANT_HEADER); CompletableFuture undeploymentComplete = new CompletableFuture<>(); @@ -143,6 +166,25 @@ private static void startVerticle(DeploymentOptions options) deploymentComplete.get(60, TimeUnit.SECONDS); } + @SneakyThrows + public static List checkKafkaEventSent(String tenant, String eventType, int expected, String userId) { + String topicToObserve = formatToKafkaTopicName(tenant, eventType); + return KAFKA_CLUSTER.observeValues(ObserveKeyValues.on(topicToObserve, expected) + .filterOnHeaders(val -> { + var header = val.lastHeader(RestVerticle.OKAPI_USERID_HEADER.toLowerCase()); + if (Objects.nonNull(header)) { + return new String(header.value()).equalsIgnoreCase(userId); + } + return false; + }) + .observeFor(30, TimeUnit.SECONDS) + .build()); + } + + private static String formatToKafkaTopicName(String tenant, String eventType) { + return KafkaTopicNameHelper.formatTopicName(KAFKA_ENV_VALUE, getDefaultNameSpace(), tenant, eventType); + } + @Nested class InvoiceTestNested extends InvoiceTest {} @@ -170,4 +212,6 @@ class BatchVoucherExportsTestNested extends BatchVoucherExportsImplTest{} class VoucherNumberTestNested extends VoucherNumberTest {} @Nested class OrderStorageServiceTestNested extends OrderStorageServiceTest {} + @Nested + class AuditOutboxAPITestNested extends AuditOutboxAPITest {} } diff --git a/src/test/java/org/folio/rest/impl/TestBase.java b/src/test/java/org/folio/rest/impl/TestBase.java index 5e600fc4..e7c2444b 100644 --- a/src/test/java/org/folio/rest/impl/TestBase.java +++ b/src/test/java/org/folio/rest/impl/TestBase.java @@ -12,6 +12,8 @@ import java.io.InputStream; import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -22,6 +24,8 @@ import org.folio.config.ApplicationConfig; import org.folio.dbschema.ObjectMapperTool; import org.folio.rest.jaxrs.model.BatchVoucher; +import org.folio.rest.jaxrs.model.InvoiceAuditEvent; +import org.folio.rest.jaxrs.model.InvoiceLineAuditEvent; import org.folio.rest.utils.TestEntities; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -55,6 +59,11 @@ public abstract class TestBase { public static final String ID = "id"; + private final Map> kafkaMessageMethods = Map.of( + TestEntities.INVOICE, getEnumValuesAsString(InvoiceAuditEvent.Action.class), + TestEntities.INVOICE_LINES, getEnumValuesAsString(InvoiceLineAuditEvent.Action.class) + ); + @BeforeAll public static void testBaseBeforeClass() throws InterruptedException, ExecutionException, TimeoutException, IOException { Vertx vertx = StorageTestSuite.getVertx(); @@ -264,4 +273,19 @@ void assertAllFieldsExistAndEqual(JsonObject sample, Response response) { JsonObject responseJson = JsonObject.mapFrom(response.then().extract().as(BatchVoucher.class)); testAllFieldsExists(responseJson, sampleJson); } + + void verifyKafkaMessagesSentIfNeeded(String eventType, TestEntities testEntity, String tenant, String userId, int expected) { + if (kafkaMessageMethods.containsKey(testEntity) && kafkaMessageMethods.get(testEntity).contains(eventType)) { + List events = StorageTestSuite.checkKafkaEventSent(tenant, eventType, expected, userId); + assertEquals(expected, events.size()); + for (String event : events) { + assertEquals(event, eventType); + } + } + } + + private static > List getEnumValuesAsString(Class enumClass) { + return Arrays.stream(enumClass.getEnumConstants()).map(Enum::toString).toList(); + } + }