Skip to content

Commit

Permalink
[MODINVOSTO-187] Add Outbox service and produces
Browse files Browse the repository at this point in the history
  • Loading branch information
Saba-Zedginidze-EPAM committed Nov 4, 2024
1 parent 32830d9 commit c14110d
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 1 deletion.
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

<!--Folio dependencies properties-->
<folio-module-descriptor-validator.version>1.0.0</folio-module-descriptor-validator.version>
<folio-kafka-wrapper.version>3.1.1</folio-kafka-wrapper.version>

<!--Dependency properties-->
<aspectj.version>1.9.22.1</aspectj.version>
Expand Down Expand Up @@ -156,6 +157,16 @@
<version>${raml-module-builder.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.folio</groupId>
<artifactId>folio-kafka-wrapper</artifactId>
<version>${folio-kafka-wrapper.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
Expand Down
6 changes: 5 additions & 1 deletion ramls/audit-outbox.raml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ documentation:
content: This API is intended for internal use only by the Timer interface

types:
outbox-event-log: !include acq-models/mod-orders-storage/schemas/outbox_event_log.json
event-action: !include acq-models/mod-invoice-storage/schemas/event_action.json
event-topic: !include acq-models/mod-invoice-storage/schemas/event_topic.json
outbox-event-log: !include acq-models/mod-invoice-storage/schemas/outbox_event_log.json
invoice-audit-event: !include acq-models/mod-invoice-storage/schemas/invoice_audit_event.json
invoice-line-audit-event: !include acq-models/mod-invoice-storage/schemas/invoice_line_audit_event.json

/invoice-storage/audit-outbox:
/process:
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/org/folio/rest/impl/AuditOutboxAPI.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.folio.rest.impl;

import javax.ws.rs.core.Response;
import java.util.Map;

import org.folio.service.audit.AuditOutboxService;
import org.folio.rest.jaxrs.resource.InvoiceStorageAuditOutbox;
import org.folio.spring.SpringContextUtil;
import org.springframework.beans.factory.annotation.Autowired;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import lombok.extern.log4j.Log4j2;

@Log4j2
public class AuditOutboxAPI implements InvoiceStorageAuditOutbox {

@Autowired
private AuditOutboxService auditOutboxService;

public AuditOutboxAPI() {
SpringContextUtil.autowireDependencies(this, Vertx.currentContext());
}

@Override
public void postInvoiceStorageAuditOutboxProcess(Map<String, String> okapiHeaders, Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
auditOutboxService.processOutboxEventLogs(okapiHeaders, vertxContext)
.onSuccess(res -> asyncResultHandler.handle(Future.succeededFuture(Response.ok().build())))
.onFailure(cause -> {
log.warn("postInvoiceStorageAuditOutboxProcess:: Processing of outbox events table has failed", cause);
asyncResultHandler.handle(Future.failedFuture(cause));
});
}
}
99 changes: 99 additions & 0 deletions src/main/java/org/folio/service/audit/AuditEventProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.folio.service.audit;

import java.util.Date;
import java.util.Map;
import java.util.UUID;

import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.kafka.SimpleKafkaProducerManager;
import org.folio.kafka.services.KafkaProducerRecordBuilder;
import org.folio.rest.jaxrs.model.EventAction;
import org.folio.rest.jaxrs.model.EventTopic;
import org.folio.rest.jaxrs.model.Invoice;
import org.folio.rest.jaxrs.model.InvoiceAuditEvent;
import org.folio.rest.jaxrs.model.InvoiceLineAuditEvent;
import org.folio.rest.jaxrs.model.InvoiceLine;
import org.folio.rest.tools.utils.TenantTool;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

@Log4j2
@RequiredArgsConstructor
public class AuditEventProducer {

private final KafkaConfig kafkaConfig;

/**
* Sends event for order change(Create, Edit, Delete) to kafka.
* OrderId is used as partition key to send all events for particular order to the same partition.
*
* @param order the event payload
* @param eventAction the event action
* @param okapiHeaders the okapi headers
* @return future with true if sending was success or failed future in another case
*/
public Future<Void> sendInvoiceEvent(Invoice invoice, EventAction eventAction, Map<String, String> okapiHeaders) {
var event = getAuditEvent(invoice, eventAction);
log.info("sendInvoiceEvent:: Sending event with id: {} and invoiceId: {} to Kafka", event.getId(), invoice.getId());
return sendToKafka(EventTopic.ACQ_INVOICE_CHANGED, event.getInvoiceId(), event, okapiHeaders)
.onFailure(t -> log.warn("sendInvoiceEvent:: Failed to send event with id: {} and invoiceId: {} to Kafka", event.getId(), invoice.getId(), t));
}

public Future<Void> sendInvoiceLineEvent(InvoiceLine invoiceLine, EventAction action, Map<String, String> okapiHeaders) {
var event = getAuditEvent(invoiceLine, action);
log.info("sendInvoiceLineEvent:: Sending event with id: {} and invoiceLineId: {} to Kafka", event.getId(), invoiceLine.getId());
return sendToKafka(EventTopic.ACQ_INVOICE_LINE_CHANGED, event.getInvoiceId(), event, okapiHeaders)
.onFailure(t -> log.warn("sendInvoiceLineEvent:: Failed to send event with id: {} and invoiceLineId: {} to Kafka", event.getId(), invoiceLine.getId(), t));
}

private InvoiceAuditEvent getAuditEvent(Invoice invoice, EventAction eventAction) {
return new InvoiceAuditEvent()
.withId(UUID.randomUUID().toString())
.withAction(eventAction)
.withInvoiceId(invoice.getId())
.withEventDate(new Date())
.withActionDate(invoice.getMetadata().getUpdatedDate())
.withUserId(invoice.getMetadata().getUpdatedByUserId())
.withInvoiceSnapshot(invoice.withMetadata(null));
}

private InvoiceAuditEvent getAuditEvent(InvoiceLine invoiceLine, EventAction eventAction) {
return new InvoiceLineAuditEvent()
.withId(UUID.randomUUID().toString())
.withAction(eventAction)
.withInvoiceId(invoiceLine.getId())
.withEventDate(new Date())
.withActionDate(invoiceLine.getMetadata().getUpdatedDate())
.withUserId(invoiceLine.getMetadata().getUpdatedByUserId())
.withInvoiceSnapshot(invoiceLine.withMetadata(null));
}

private Future<Void> sendToKafka(EventTopic eventTopic, String key, Object eventPayload, Map<String, String> okapiHeaders) {
var tenantId = TenantTool.tenantId(okapiHeaders);
var topicName = buildTopicName(kafkaConfig.getEnvId(), tenantId, eventTopic.value());
KafkaProducerRecord<String, String> kafkaProducerRecord = new KafkaProducerRecordBuilder<String, Object>(tenantId)
.key(key)
.value(eventPayload)
.topic(topicName)
.propagateOkapiHeaders(okapiHeaders)
.build();

var producerManager = new SimpleKafkaProducerManager(Vertx.currentContext().owner(), kafkaConfig);
KafkaProducer<String, String> producer = producerManager.createShared(topicName);
return producer.send(kafkaProducerRecord)
.onComplete(reply -> producer.end(ear -> producer.close()))
.onSuccess(s -> log.info("sendToKafka:: Event for {} with id '{}' has been sent to kafka topic '{}'", eventTopic, key, topicName))
.onFailure(t -> log.error("Failed to send event for {} with id '{}' to kafka topic '{}'", eventTopic, key, topicName, t))
.mapEmpty();
}

private String buildTopicName(String envId, String tenantId, String eventType) {
return KafkaTopicNameHelper.formatTopicName(envId, KafkaTopicNameHelper.getDefaultNameSpace(), tenantId, eventType);
}
}
109 changes: 109 additions & 0 deletions src/main/java/org/folio/service/audit/AuditOutboxService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package org.folio.service.audit;

import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.apache.commons.collections4.CollectionUtils;
import org.folio.dao.audit.AuditOutboxEventLogDAO;
import org.folio.dao.lock.InternalLockDAO;
import org.folio.okapi.common.GenericCompositeFuture;
import org.folio.rest.jaxrs.model.EventAction;
import org.folio.rest.jaxrs.model.Invoice;
import org.folio.rest.jaxrs.model.InvoiceLine;
import org.folio.rest.jaxrs.model.OutboxEventLog;
import org.folio.rest.jaxrs.model.OutboxEventLog.EntityType;
import org.folio.rest.persist.Conn;
import org.folio.rest.persist.DBClient;
import org.folio.rest.tools.utils.TenantTool;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.json.Json;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

@Log4j2
@RequiredArgsConstructor
public class AuditOutboxService {

private static final String OUTBOX_LOCK_NAME = "audit_outbox";

private final AuditOutboxEventLogDAO outboxEventLogDAO;
private final InternalLockDAO internalLockDAO;
private final AuditEventProducer producer;

/**
* Reads outbox event logs from DB and send them to Kafka and delete from outbox table in a single transaction.
*
* @param okapiHeaders the okapi headers
* @param vertxContext the vertx context
* @return future with integer how many records have been processed
*/
public Future<Integer> processOutboxEventLogs(Map<String, String> okapiHeaders, Context vertxContext) {
var tenantId = TenantTool.tenantId(okapiHeaders);
return new DBClient(vertxContext, okapiHeaders).getPgClient()
.withTrans(conn -> internalLockDAO.selectWithLocking(conn, OUTBOX_LOCK_NAME, tenantId)
.compose(retrievedCount -> outboxEventLogDAO.getEventLogs(conn, tenantId))
.compose(logs -> {
if (CollectionUtils.isEmpty(logs)) {
log.info("processOutboxEventLogs: No logs found in outbox table");
return Future.succeededFuture(0);
}
log.info("processOutboxEventLogs: {} logs found in outbox table, sending to kafka", logs.size());
return GenericCompositeFuture.join(sendEventLogsToKafka(logs, okapiHeaders))
.map(logs.stream().map(OutboxEventLog::getEventId).toList())
.compose(eventIds -> outboxEventLogDAO.deleteEventLogs(conn, eventIds, tenantId))
.onSuccess(count -> log.info("processOutboxEventLogs:: {} logs have been deleted from outbox table", count))
.onFailure(ex -> log.error("Logs deletion failed", ex));
})
);
}

private List<Future<Void>> sendEventLogsToKafka(List<OutboxEventLog> eventLogs, Map<String, String> okapiHeaders) {
return eventLogs.stream().map(eventLog ->
switch (eventLog.getEntityType()) {
case INVOICE -> producer.sendInvoiceEvent(Json.decodeValue(eventLog.getPayload(), Invoice.class), eventLog.getAction(), okapiHeaders);
case INVOICE_LINE -> producer.sendInvoiceLineEvent(Json.decodeValue(eventLog.getPayload(), InvoiceLine.class), eventLog.getAction(), okapiHeaders);
}).toList();
}

/**
* Saves invoice outbox log.
*
* @param conn connection in transaction
* @param entity the invoice
* @param action the event action
* @param okapiHeaders okapi headers
* @return future with saved outbox log id in the same transaction
*/
public Future<String> saveInvoiceOutboxLog(Conn conn, Invoice entity, EventAction action, Map<String, String> okapiHeaders) {
return saveOutboxLog(conn, okapiHeaders, action, EntityType.INVOICE, entity.getId(), entity);
}

/**
* Saves invoice line outbox log.
*
* @param conn connection in transaction
* @param entity the invoice line
* @param action the event action
* @param okapiHeaders okapi headers
* @return future with saved outbox log id in the same transaction
*/
public Future<String> saveInvoiceLineOutboxLog(Conn conn, InvoiceLine entity, EventAction action, Map<String, String> okapiHeaders) {
return saveOutboxLog(conn, okapiHeaders, action, EntityType.INVOICE_LINE, entity.getId(), entity);
}

private Future<String> saveOutboxLog(Conn conn, Map<String, String> okapiHeaders, EventAction action, EntityType entityType, String entityId, Object entity) {
log.debug("saveOutboxLog:: Saving outbox log for {} with id: {}", entityType, entityId);
var eventLog = new OutboxEventLog()
.withEventId(UUID.randomUUID().toString())
.withAction(action)
.withEntityType(entityType)
.withPayload(Json.encode(entity));
return outboxEventLogDAO.saveEventLog(conn, eventLog, TenantTool.tenantId(okapiHeaders))
.onSuccess(reply -> log.info("saveOutboxLog:: Outbox log has been saved for {} with id: {}", entityType, entityId))
.onFailure(e -> log.warn("saveOutboxLog:: Could not save outbox audit log for {} with id: {}", entityType, entityId, e));
}

}

0 comments on commit c14110d

Please sign in to comment.