Skip to content

Commit

Permalink
[MODAUD-195]. Implement consumer & endpoint for invoice records (#175)
Browse files Browse the repository at this point in the history
* [MODAUD-195]. Implement consumer & endpoint for invoice records

* [MODAUD-195]. Fix sonar issues

* [MODAUD-195]. Add final modified to test constants

* [MODAUD-195]. Add worker pool size env vars to each consumer

* [MODORDERS-1208]. Apply review recommendations

* [MODORDERS-1208]. Remove unused promises

* [MODORDERS-1208]. Remove all promises & unnecessary loggers

* [MODAUD-195]. Set additionalProperties=true (open to modifications)

* [MODAUD-195]. Remove duplicated logger
  • Loading branch information
BKadirkhodjaev authored Nov 4, 2024
1 parent 727953c commit 3829790
Show file tree
Hide file tree
Showing 26 changed files with 874 additions and 27 deletions.
23 changes: 22 additions & 1 deletion descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,21 @@
}
]
},
{
"id": "acquisition-invoice-events",
"version": "1.0",
"handlers": [
{
"methods": [
"GET"
],
"pathPattern": "/audit-data/acquisition/invoice/{id}",
"permissionsRequired": [
"acquisition.invoice.events.get"
]
}
]
},
{
"id": "circulation-logs",
"version": "1.2",
Expand Down Expand Up @@ -264,6 +279,11 @@
"displayName": "Acquisition piece status change history events - get piece status change events",
"description": "Get piece status change events"
},
{
"permissionName": "acquisition.invoice.events.get",
"displayName": "Acquisition invoice events - get invoice change events",
"description": "Get invoice change events"
},
{
"permissionName": "audit.all",
"displayName": "Audit - all permissions",
Expand All @@ -278,7 +298,8 @@
"acquisition.order.events.get",
"acquisition.order-line.events.get",
"acquisition.piece.events.get",
"acquisition.piece.events.history.get"
"acquisition.piece.events.history.get",
"acquisition.invoice.events.get"
]
}
],
Expand Down
4 changes: 2 additions & 2 deletions mod-audit-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>mod-audit-server</artifactId>
<version>2.10.1-SNAPSHOT</version>
<version>3.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<parent>
<artifactId>mod-audit</artifactId>
<groupId>org.folio</groupId>
<version>2.10.1-SNAPSHOT</version>
<version>3.0.0-SNAPSHOT</version>
</parent>

<licenses>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.folio.dao.acquisition;

import io.vertx.core.Future;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import org.folio.rest.jaxrs.model.InvoiceAuditEvent;
import org.folio.rest.jaxrs.model.InvoiceAuditEventCollection;

public interface InvoiceEventsDao {

/**
* Saves invoiceAuditEvent entity to DB
*
* @param invoiceAuditEvent InvoiceAuditEvent entity to save
* @param tenantId tenant id
* @return future with created row
*/
Future<RowSet<Row>> save(InvoiceAuditEvent invoiceAuditEvent, String tenantId);

/**
* Searches for invoice audit events by id
*
* @param invoiceId invoice id
* @param sortBy sort by
* @param sortInvoice sort invoice
* @param limit limit
* @param offset offset
* @param tenantId tenant id
* @return future with InvoiceAuditEventCollection
*/
Future<InvoiceAuditEventCollection> getAuditEventsByInvoiceId(String invoiceId, String sortBy, String sortInvoice, int limit, int offset, String tenantId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package org.folio.dao.acquisition.impl;

import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dao.acquisition.InvoiceEventsDao;
import org.folio.rest.jaxrs.model.InvoiceAuditEvent;
import org.folio.rest.jaxrs.model.InvoiceAuditEventCollection;
import org.folio.util.PostgresClientFactory;
import org.springframework.stereotype.Repository;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.UUID;

import static java.lang.String.format;
import static org.folio.util.AuditEventDBConstants.ACTION_DATE_FIELD;
import static org.folio.util.AuditEventDBConstants.ACTION_FIELD;
import static org.folio.util.AuditEventDBConstants.EVENT_DATE_FIELD;
import static org.folio.util.AuditEventDBConstants.ID_FIELD;
import static org.folio.util.AuditEventDBConstants.INVOICE_ID_FIELD;
import static org.folio.util.AuditEventDBConstants.MODIFIED_CONTENT_FIELD;
import static org.folio.util.AuditEventDBConstants.ORDER_BY_PATTERN;
import static org.folio.util.AuditEventDBConstants.TOTAL_RECORDS_FIELD;
import static org.folio.util.AuditEventDBConstants.USER_ID_FIELD;
import static org.folio.util.DbUtils.formatDBTableName;

@Repository
public class InvoiceEventsDaoImpl implements InvoiceEventsDao {

private static final Logger LOGGER = LogManager.getLogger();

public static final String TABLE_NAME = "acquisition_invoice_log";

public static final String GET_BY_INVOICE_ID_SQL = "SELECT id, action, invoice_id, user_id, event_date, action_date, modified_content_snapshot," +
" (SELECT count(*) AS total_records FROM %s WHERE invoice_id = $1) FROM %s WHERE invoice_id = $1 %s LIMIT $2 OFFSET $3";

public static final String INSERT_SQL = "INSERT INTO %s (id, action, invoice_id, user_id, event_date, action_date, modified_content_snapshot)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7)";

private final PostgresClientFactory pgClientFactory;

public InvoiceEventsDaoImpl(PostgresClientFactory pgClientFactory) {
this.pgClientFactory = pgClientFactory;
}

@Override
public Future<RowSet<Row>> save(InvoiceAuditEvent invoiceAuditEvent, String tenantId) {
LOGGER.debug("save:: Saving Invoice AuditEvent with tenant id : {}", tenantId);
String logTable = formatDBTableName(tenantId, TABLE_NAME);
String query = format(INSERT_SQL, logTable);
return makeSaveCall(query, invoiceAuditEvent, tenantId)
.onSuccess(rows -> LOGGER.info("save:: Saved Invoice AuditEvent with tenant id : {}", tenantId))
.onFailure(e -> LOGGER.error("Failed to save record with id: {} for invoice id: {} in to table {}",
invoiceAuditEvent.getId(), invoiceAuditEvent.getInvoiceId(), TABLE_NAME, e));
}

@Override
public Future<InvoiceAuditEventCollection> getAuditEventsByInvoiceId(String invoiceId, String sortBy, String sortInvoice, int limit, int offset, String tenantId) {
LOGGER.debug("getAuditEventsByInvoiceId:: Retrieving AuditEvent with invoice id : {}", invoiceId);
String logTable = formatDBTableName(tenantId, TABLE_NAME);
String query = format(GET_BY_INVOICE_ID_SQL, logTable, logTable, format(ORDER_BY_PATTERN, sortBy, sortInvoice));
return pgClientFactory.createInstance(tenantId).execute(query, Tuple.of(UUID.fromString(invoiceId), limit, offset))
.map(rowSet -> rowSet.rowCount() == 0 ? new InvoiceAuditEventCollection().withTotalItems(0)
: mapRowToListOfInvoiceEvent(rowSet));
}

private Future<RowSet<Row>> makeSaveCall(String query, InvoiceAuditEvent invoiceAuditEvent, String tenantId) {
LOGGER.debug("makeSaveCall:: Making save call with query : {} and tenant id : {}", query, tenantId);
try {
return pgClientFactory.createInstance(tenantId).execute(query, Tuple.of(invoiceAuditEvent.getId(),
invoiceAuditEvent.getAction(),
invoiceAuditEvent.getInvoiceId(),
invoiceAuditEvent.getUserId(),
LocalDateTime.ofInstant(invoiceAuditEvent.getEventDate().toInstant(), ZoneId.systemDefault()),
LocalDateTime.ofInstant(invoiceAuditEvent.getActionDate().toInstant(), ZoneId.systemDefault()),
JsonObject.mapFrom(invoiceAuditEvent.getInvoiceSnapshot())));
} catch (Exception e) {
return Future.failedFuture(e);
}
}

private InvoiceAuditEventCollection mapRowToListOfInvoiceEvent(RowSet<Row> rowSet) {
LOGGER.debug("mapRowToListOfInvoiceEvent:: Mapping row to List of Invoice Events");
InvoiceAuditEventCollection invoiceAuditEventCollection = new InvoiceAuditEventCollection();
rowSet.iterator().forEachRemaining(row -> {
invoiceAuditEventCollection.getInvoiceAuditEvents().add(mapRowToInvoiceEvent(row));
invoiceAuditEventCollection.setTotalItems(row.getInteger(TOTAL_RECORDS_FIELD));
});
LOGGER.debug("mapRowToListOfInvoiceEvent:: Mapped row to List of Invoice Events");
return invoiceAuditEventCollection;
}

private InvoiceAuditEvent mapRowToInvoiceEvent(Row row) {
LOGGER.debug("mapRowToInvoiceEvent:: Mapping row to Invoice Event");
return new InvoiceAuditEvent()
.withId(row.getValue(ID_FIELD).toString())
.withAction(row.get(InvoiceAuditEvent.Action.class, ACTION_FIELD))
.withInvoiceId(row.getValue(INVOICE_ID_FIELD).toString())
.withUserId(row.getValue(USER_ID_FIELD).toString())
.withEventDate(Date.from(row.getLocalDateTime(EVENT_DATE_FIELD).toInstant(ZoneOffset.UTC)))
.withActionDate(Date.from(row.getLocalDateTime(ACTION_DATE_FIELD).toInstant(ZoneOffset.UTC)))
.withInvoiceSnapshot(JsonObject.mapFrom(row.getValue(MODIFIED_CONTENT_FIELD)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import io.vertx.core.Future;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.rest.jaxrs.model.AuditDataAcquisitionInvoiceIdGetSortOrder;
import org.folio.rest.jaxrs.model.AuditDataAcquisitionOrderIdGetSortOrder;
import org.folio.rest.jaxrs.model.AuditDataAcquisitionOrderLineIdGetSortOrder;
import org.folio.rest.jaxrs.model.AuditDataAcquisitionPieceIdGetSortOrder;
import org.folio.rest.jaxrs.model.AuditDataAcquisitionPieceIdStatusChangeHistoryGetSortOrder;
import org.folio.rest.jaxrs.resource.AuditDataAcquisition;
import org.folio.rest.tools.utils.TenantTool;
import org.folio.services.acquisition.InvoiceAuditEventsService;
import org.folio.services.acquisition.OrderAuditEventsService;
import org.folio.services.acquisition.OrderLineAuditEventsService;
import org.folio.services.acquisition.PieceAuditEventsService;
Expand All @@ -35,6 +37,8 @@ public class AuditDataAcquisitionImpl implements AuditDataAcquisition {
private OrderLineAuditEventsService orderLineAuditEventsService;
@Autowired
private PieceAuditEventsService pieceAuditEventsService;
@Autowired
private InvoiceAuditEventsService invoiceAuditEventsService;

public AuditDataAcquisitionImpl() {
SpringContextUtil.autowireDependencies(this, Vertx.currentContext());
Expand Down Expand Up @@ -110,6 +114,24 @@ public void getAuditDataAcquisitionPieceStatusChangeHistoryById(String pieceId,
}
}

@Override
public void getAuditDataAcquisitionInvoiceById(String invoiceId, String sortBy,
AuditDataAcquisitionInvoiceIdGetSortOrder sortOrder,
int limit, int offset, Map<String, String> okapiHeaders, Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
LOGGER.debug("getAuditDataAcquisitionOrderLineById:: Retrieving Audit Data Acquisition Invoice Line By Id : {}", invoiceId);
String tenantId = TenantTool.tenantId(okapiHeaders);
try {
invoiceAuditEventsService.getAuditEventsByInvoiceId(invoiceId, sortBy, sortOrder.name(), limit, offset, tenantId)
.map(GetAuditDataAcquisitionInvoiceByIdResponse::respond200WithApplicationJson)
.map(Response.class::cast)
.otherwise(this::mapExceptionToResponse)
.onComplete(asyncResultHandler);
} catch (Exception e) {
LOGGER.error("Failed to get invoice audit events by piece id: {}", invoiceId, e);
asyncResultHandler.handle(Future.succeededFuture(mapExceptionToResponse(e)));
}
}

private Response mapExceptionToResponse(Throwable throwable) {
LOGGER.debug("mapExceptionToResponse:: Mapping Exception :{} to Response", throwable.getMessage(), throwable);
return GetAuditDataAcquisitionOrderByIdResponse
Expand Down
44 changes: 29 additions & 15 deletions mod-audit-server/src/main/java/org/folio/rest/impl/InitAPIs.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.ThreadingModel;
import io.vertx.core.Vertx;
import io.vertx.core.spi.VerticleFactory;
import org.apache.logging.log4j.LogManager;
Expand All @@ -15,6 +16,7 @@
import org.folio.rest.resource.interfaces.InitAPI;
import org.folio.spring.SpringContextUtil;
import org.folio.verticle.SpringVerticleFactory;
import org.folio.verticle.acquisition.InvoiceEventConsumersVerticle;
import org.folio.verticle.acquisition.OrderEventConsumersVerticle;
import org.folio.verticle.acquisition.OrderLineEventConsumersVerticle;
import org.folio.verticle.acquisition.PieceEventConsumersVerticle;
Expand All @@ -29,10 +31,23 @@ public class InitAPIs implements InitAPI {

@Value("${acq.orders.kafka.consumer.instancesNumber:1}")
private int acqOrderConsumerInstancesNumber;
@Value("${acq.orders.kafka.consumer.pool.size:5}")
private int acqOrderConsumerPoolSize;

@Value("${acq.order-lines.kafka.consumer.instancesNumber:1}")
private int acqOrderLineConsumerInstancesNumber;
@Value("${acq.order-lines.kafka.consumer.pool.size:5}")
private int acqOrderLineConsumerPoolSize;

@Value("${acq.pieces.kafka.consumer.instancesNumber:1}")
private int acqPieceConsumerInstancesNumber;
@Value("${acq.orders.kafka.consumer.pool.size:5}")
private int acqPieceConsumerPoolSize;

@Value("${acq.invoices.kafka.consumer.instancesNumber:1}")
private int acqInvoiceConsumerInstancesNumber;
@Value("${acq.orders.kafka.consumer.pool.size:5}")
private int acqInvoiceConsumerPoolSize;

@Override
public void init(Vertx vertx, Context context, Handler<AsyncResult<Boolean>> handler) {
Expand Down Expand Up @@ -64,27 +79,26 @@ private Future<?> deployConsumersVerticles(Vertx vertx) {
Promise<String> orderEventsConsumer = Promise.promise();
Promise<String> orderLineEventsConsumer = Promise.promise();
Promise<String> pieceEventsConsumer = Promise.promise();
Promise<String> invoiceEventsConsumer = Promise.promise();

vertx.deployVerticle(getVerticleName(verticleFactory, OrderEventConsumersVerticle.class),
new DeploymentOptions()
.setWorker(true)
.setInstances(acqOrderConsumerInstancesNumber), orderEventsConsumer);

vertx.deployVerticle(getVerticleName(verticleFactory, OrderLineEventConsumersVerticle.class),
new DeploymentOptions()
.setWorker(true)
.setInstances(acqOrderLineConsumerInstancesNumber), orderLineEventsConsumer);

vertx.deployVerticle(getVerticleName(verticleFactory, PieceEventConsumersVerticle.class),
new DeploymentOptions()
.setWorker(true)
.setInstances(acqPieceConsumerInstancesNumber), pieceEventsConsumer);
deployVerticle(vertx, verticleFactory, OrderEventConsumersVerticle.class, acqOrderConsumerInstancesNumber, acqOrderConsumerPoolSize, orderEventsConsumer);
deployVerticle(vertx, verticleFactory, OrderLineEventConsumersVerticle.class, acqOrderLineConsumerInstancesNumber, acqOrderLineConsumerPoolSize, orderLineEventsConsumer);
deployVerticle(vertx, verticleFactory, PieceEventConsumersVerticle.class, acqPieceConsumerInstancesNumber, acqPieceConsumerPoolSize, pieceEventsConsumer);
deployVerticle(vertx, verticleFactory, InvoiceEventConsumersVerticle.class, acqInvoiceConsumerInstancesNumber, acqInvoiceConsumerPoolSize, invoiceEventsConsumer);

LOGGER.info("deployConsumersVerticles:: All consumer verticles were successfully deployed");
return GenericCompositeFuture.all(Arrays.asList(
orderEventsConsumer.future(),
orderLineEventsConsumer.future(),
pieceEventsConsumer.future()));
pieceEventsConsumer.future(),
invoiceEventsConsumer.future()));
}

private <T> void deployVerticle(Vertx vertx, VerticleFactory verticleFactory, Class<T> consumerClass,
int acqOrderConsumerInstancesNumber, int acqOrderConsumerPoolSize, Promise<String> orderEventsConsumer) {
DeploymentOptions deploymentOptions = new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)
.setInstances(acqOrderConsumerInstancesNumber).setWorkerPoolSize(acqOrderConsumerPoolSize);
vertx.deployVerticle(getVerticleName(verticleFactory, consumerClass), deploymentOptions, orderEventsConsumer);
}

private <T> String getVerticleName(VerticleFactory verticleFactory, Class<T> clazz) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.folio.services.acquisition;

import io.vertx.core.Future;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import org.folio.rest.jaxrs.model.InvoiceAuditEvent;
import org.folio.rest.jaxrs.model.InvoiceAuditEventCollection;

public interface InvoiceAuditEventsService {

/**
* Saves InvoiceAuditEvent
*
* @param invoiceAuditEvent
* @param tenantId id of tenant
* @return successful future if event has not been processed, or failed future otherwise
*/
Future<RowSet<Row>> saveInvoiceAuditEvent(InvoiceAuditEvent invoiceAuditEvent, String tenantId);

/**
* Searches for invoice audit events by invoice id
*
* @param invoiceId invoice id
* @param sortBy sort by
* @param sortInvoice sort invoice
* @param limit limit
* @param offset offset
* @return future with InvoiceAuditEventCollection
*/
Future<InvoiceAuditEventCollection> getAuditEventsByInvoiceId(String invoiceId, String sortBy, String sortInvoice, int limit, int offset, String tenantId);
}
Loading

0 comments on commit 3829790

Please sign in to comment.