Skip to content

Commit

Permalink
Merge branch 'master' into MODINVOICE-569
Browse files Browse the repository at this point in the history
  • Loading branch information
azizbekxm authored Nov 15, 2024
2 parents dac8893 + 6f949c1 commit 90f3779
Show file tree
Hide file tree
Showing 31 changed files with 914 additions and 95 deletions.
18 changes: 17 additions & 1 deletion descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,20 @@
"permissionsRequired": ["batch-group-storage.batch-groups.item.delete"]
}
]
},
{
"id": "_timer",
"version": "1.0",
"interfaceType": "system",
"handlers": [
{
"methods": ["POST"],
"pathPattern": "/invoice-storage/audit-outbox/process",
"modulePermissions": [],
"unit": "minute",
"delay": "30"
}
]
}
],
"permissionSets": [
Expand Down Expand Up @@ -703,7 +717,9 @@
{ "name": "DB_DATABASE", "value": "okapi_modules" },
{ "name": "DB_QUERYTIMEOUT", "value": "60000" },
{ "name": "DB_CHARSET", "value": "UTF-8" },
{ "name": "DB_MAXPOOLSIZE", "value": "5" }
{ "name": "DB_MAXPOOLSIZE", "value": "5" },
{ "name": "KAFKA_HOST", "value": "10.0.2.15" },
{ "name": "KAFKA_PORT", "value": "9092" }
]
}
}
45 changes: 45 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@

<!--Dependency Management Properties-->
<vertx.version>4.5.10</vertx.version>
<kafkaclients.version>3.6.1</kafkaclients.version>
<log4j.version>2.24.1</log4j.version>

<!--Folio dependencies properties-->
<folio-module-descriptor-validator.version>1.0.0</folio-module-descriptor-validator.version>
<folio-kafka-wrapper.version>3.2.0</folio-kafka-wrapper.version>

<!--Dependency properties-->
<aspectj.version>1.9.22.1</aspectj.version>
Expand Down Expand Up @@ -156,6 +158,16 @@
<version>${raml-module-builder.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.folio</groupId>
<artifactId>folio-kafka-wrapper</artifactId>
<version>${folio-kafka-wrapper.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
Expand All @@ -172,6 +184,35 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafkaclients.version}</version>
</dependency>
<dependency>
<groupId>net.mguenther.kafka</groupId>
<artifactId>kafka-junit</artifactId>
<version>3.6.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.13</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -343,6 +384,10 @@
<XaddSerialVersionUID>true</XaddSerialVersionUID>
<showWeaveInfo>false</showWeaveInfo>
<forceAjcCompile>true</forceAjcCompile>
<includes>
<include>**/impl/*.java</include>
<include>**/*.aj</include>
</includes>
<aspectLibraries>
<aspectLibrary>
<groupId>org.folio</groupId>
Expand Down
20 changes: 20 additions & 0 deletions ramls/audit-outbox.raml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#%RAML 1.0
title: Audit outbox API
version: v1.0
protocols: [ HTTP, HTTPS ]
baseUri: http://github.com/folio-org/mod-invoice-storage

documentation:
- title: Audit outbox API
content: This API is intended for internal use only by the Timer interface

types:
outbox-event-log: !include acq-models/mod-invoice-storage/schemas/outbox_event_log.json
invoice-audit-event: !include acq-models/mod-invoice-storage/schemas/invoice_audit_event.json
invoice-line-audit-event: !include acq-models/mod-invoice-storage/schemas/invoice_line_audit_event.json
event-topic: !include acq-models/mod-invoice-storage/schemas/event_topic.json

/invoice-storage/audit-outbox:
/process:
post:
description: Read audit events from DB and send them to Kafka
40 changes: 38 additions & 2 deletions src/main/java/org/folio/config/ApplicationConfig.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
package org.folio.config;

import org.folio.dao.audit.AuditOutboxEventLogDAO;
import org.folio.dao.audit.AuditOutboxEventLogPostgresDAO;
import org.folio.dao.invoice.InvoiceDAO;
import org.folio.dao.invoice.InvoicePostgresDAO;
import org.folio.dao.lines.InvoiceLinesDAO;
import org.folio.dao.lines.InvoiceLinesPostgresDAO;
import org.folio.dao.lock.InternalLockDAO;
import org.folio.dao.lock.InternalLockPostgresDAO;
import org.folio.kafka.KafkaConfig;
import org.folio.rest.core.RestClient;
import org.folio.service.InvoiceLineNumberService;
import org.folio.service.InvoiceLineStorageService;
import org.folio.service.InvoiceStorageService;
import org.folio.service.audit.AuditEventProducer;
import org.folio.service.audit.AuditOutboxService;
import org.folio.service.order.OrderStorageService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@Import(KafkaConfiguration.class)
public class ApplicationConfig {

@Bean
Expand All @@ -34,12 +44,38 @@ public InvoiceLinesDAO invoiceLinesDAO() {
}

@Bean
public InvoiceStorageService invoiceStorageService(InvoiceDAO invoiceDAO) {
return new InvoiceStorageService(invoiceDAO);
public AuditOutboxEventLogDAO auditOutboxEventLogDAO() {
return new AuditOutboxEventLogPostgresDAO();
}

@Bean
public InternalLockDAO internalLockDAO() {
return new InternalLockPostgresDAO();
}

@Bean
public InvoiceStorageService invoiceStorageService(InvoiceDAO invoiceDAO, AuditOutboxService auditOutboxService) {
return new InvoiceStorageService(invoiceDAO, auditOutboxService);
}

@Bean
public InvoiceLineStorageService invoiceLineStorageService(InvoiceLinesDAO invoiceLinesDAO, AuditOutboxService auditOutboxService) {
return new InvoiceLineStorageService(invoiceLinesDAO, auditOutboxService);
}

@Bean
public InvoiceLineNumberService invoiceLineNumberService(InvoiceDAO invoiceDAO, InvoiceLinesDAO invoiceLinesDAO) {
return new InvoiceLineNumberService(invoiceDAO, invoiceLinesDAO);
}

@Bean
public AuditEventProducer auditEventProducer(KafkaConfig kafkaConfig) {
return new AuditEventProducer(kafkaConfig);
}

@Bean
public AuditOutboxService auditOutboxService(AuditOutboxEventLogDAO auditOutboxEventLogDAO, InternalLockDAO internalLockDAO, AuditEventProducer producer) {
return new AuditOutboxService(auditOutboxEventLogDAO, internalLockDAO, producer);
}

}
36 changes: 36 additions & 0 deletions src/main/java/org/folio/config/KafkaConfiguration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.folio.config;

import org.folio.kafka.KafkaConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfiguration {

@Value("${KAFKA_HOST:kafka}")
private String kafkaHost;
@Value("${KAFKA_PORT:9092}")
private String kafkaPort;
@Value("${OKAPI_URL:http://okapi:9130}")
private String okapiUrl;
@Value("${REPLICATION_FACTOR:1}")
private int replicationFactor;
@Value("${MAX_REQUEST_SIZE:1048576}")
private int maxRequestSize;
@Value("${ENV:folio}")
private String envId;

@Bean
public KafkaConfig kafkaConfig() {
return KafkaConfig.builder()
.envId(envId)
.kafkaHost(kafkaHost)
.kafkaPort(kafkaPort)
.okapiUrl(okapiUrl)
.replicationFactor(replicationFactor)
.maxRequestSize(maxRequestSize)
.build();
}

}
27 changes: 27 additions & 0 deletions src/main/java/org/folio/dao/DbUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.folio.dao;

import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static org.folio.rest.persist.PostgresClient.convertToPsqlStandard;

import io.vertx.core.Future;
import io.vertx.ext.web.handler.HttpException;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;

public class DbUtils {

private static final String TABLE_NAME_TEMPLATE = "%s.%s";

public static String getTenantTableName(String tenantId, String tableName) {
return TABLE_NAME_TEMPLATE.formatted(convertToPsqlStandard(tenantId), tableName);
}

public static Future<Void> verifyEntityUpdate(RowSet<Row> updated) {
return updated.rowCount() == 1
? Future.succeededFuture()
: Future.failedFuture(new HttpException(NOT_FOUND.getStatusCode(), NOT_FOUND.getReasonPhrase()));
}

private DbUtils() {}

}
18 changes: 18 additions & 0 deletions src/main/java/org/folio/dao/audit/AuditOutboxEventLogDAO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.folio.dao.audit;

import java.util.List;

import org.folio.rest.jaxrs.model.OutboxEventLog;
import org.folio.rest.persist.Conn;

import io.vertx.core.Future;

public interface AuditOutboxEventLogDAO {

Future<List<OutboxEventLog>> getEventLogs(Conn conn, String tenantId);

Future<Void> saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId);

Future<Integer> deleteEventLogs(Conn conn, List<String> eventIds, String tenantId);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.folio.dao.audit;

import static org.folio.dao.DbUtils.getTenantTableName;

import java.util.List;
import java.util.UUID;

import org.folio.rest.jaxrs.model.OutboxEventLog;
import org.folio.rest.persist.Conn;
import org.folio.service.util.OutboxEventFields;

import io.vertx.core.Future;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.Tuple;
import lombok.extern.log4j.Log4j2;
import one.util.streamex.StreamEx;

@Log4j2
public class AuditOutboxEventLogPostgresDAO implements AuditOutboxEventLogDAO {

public static final String OUTBOX_TABLE_NAME = "outbox_event_log";
private static final String SELECT_SQL = "SELECT * FROM %s LIMIT 1000";
private static final String INSERT_SQL = "INSERT INTO %s (event_id, entity_type, action, payload) VALUES ($1, $2, $3, $4)";
private static final String BATCH_DELETE_SQL = "DELETE from %s where event_id = ANY ($1)";

/**
* Get all event logs from outbox table.
*
* @param conn the sql connection from transaction
* @param tenantId the tenant id
* @return future of a list of fetched event logs
*/
public Future<List<OutboxEventLog>> getEventLogs(Conn conn, String tenantId) {
log.trace("getEventLogs:: Fetching event logs from outbox table for tenantId: '{}'", tenantId);
var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME);
return conn.execute(SELECT_SQL.formatted(tableName))
.map(rows -> StreamEx.of(rows.iterator()).map(this::convertDbRowToOutboxEventLog).toList())
.onFailure(t -> log.warn("getEventLogs:: Failed to fetch event logs for tenantId: '{}'", tenantId, t));
}

/**
* Saves event log to outbox table.
*
* @param conn the sql connection from transaction
* @param eventLog the event log to save
* @param tenantId the tenant id
* @return future of id of the inserted entity
*/
public Future<Void> saveEventLog(Conn conn, OutboxEventLog eventLog, String tenantId) {
log.debug("saveEventLog:: Saving event log to outbox table with eventId: '{}'", eventLog.getEventId());
var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME);
Tuple params = Tuple.of(eventLog.getEventId(), eventLog.getEntityType().value(), eventLog.getAction(), eventLog.getPayload());
return conn.execute(INSERT_SQL.formatted(tableName), params)
.onFailure(t -> log.warn("saveEventLog:: Failed to save event log with id: '{}'", eventLog.getEventId(), t))
.mapEmpty();
}

/**
* Deletes outbox logs by event ids in batch.
*
* @param conn the sql connection from transaction
* @param eventIds the event ids to delete
* @param tenantId the tenant id
* @return future of row count for deleted records
*/
public Future<Integer> deleteEventLogs(Conn conn, List<String> eventIds, String tenantId) {
log.debug("deleteEventLogs:: Deleting outbox logs by event ids in batch: '{}'", eventIds);
var tableName = getTenantTableName(tenantId, OUTBOX_TABLE_NAME);
var param = eventIds.stream().map(UUID::fromString).toArray(UUID[]::new);
return conn.execute(BATCH_DELETE_SQL.formatted(tableName), Tuple.of(param))
.map(SqlResult::rowCount)
.onFailure(t -> log.warn("deleteEventLogs: Failed to delete event logs by ids: '{}'", eventIds, t));
}

private OutboxEventLog convertDbRowToOutboxEventLog(Row row) {
return new OutboxEventLog()
.withEventId(row.getUUID(OutboxEventFields.EVENT_ID.getName()).toString())
.withEntityType(OutboxEventLog.EntityType.fromValue(row.getString(OutboxEventFields.ENTITY_TYPE.getName())))
.withAction(row.getString(OutboxEventFields.ACTION.getName()))
.withPayload(row.getString(OutboxEventFields.PAYLOAD.getName()));
}

}
4 changes: 2 additions & 2 deletions src/main/java/org/folio/dao/invoice/InvoiceDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
public interface InvoiceDAO {

Future<Invoice> getInvoiceByIdForUpdate(String invoiceId, Conn conn);
Future<DBClient> createInvoice(Invoice invoice, DBClient client);
Future<Void> updateInvoice(Invoice invoice, Conn conn);
Future<String> createInvoice(Invoice invoice, Conn conn);
Future<Void> updateInvoice(String id, Invoice invoice, Conn conn);
Future<DBClient> deleteInvoice(String id, DBClient client);
Future<DBClient> deleteInvoiceLinesByInvoiceId(String id, DBClient client);
Future<DBClient> deleteInvoiceDocumentsByInvoiceId(String id, DBClient client);
Expand Down
Loading

0 comments on commit 90f3779

Please sign in to comment.