Skip to content

Commit

Permalink
[MODINVOSTO-187] Address comments and add kafka junit
Browse files Browse the repository at this point in the history
  • Loading branch information
Saba-Zedginidze-EPAM committed Nov 7, 2024
1 parent eb7f6a1 commit a2244e6
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 90 deletions.
26 changes: 25 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

<!--Folio dependencies properties-->
<folio-module-descriptor-validator.version>1.0.0</folio-module-descriptor-validator.version>
<folio-kafka-wrapper.version>3.1.1</folio-kafka-wrapper.version>
<folio-kafka-wrapper.version>3.2.0</folio-kafka-wrapper.version>

<!--Dependency properties-->
<aspectj.version>1.9.22.1</aspectj.version>
Expand Down Expand Up @@ -183,6 +183,30 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>net.mguenther.kafka</groupId>
<artifactId>kafka-junit</artifactId>
<version>3.6.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.13</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
66 changes: 24 additions & 42 deletions src/main/java/org/folio/service/InvoiceLineStorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,54 +31,36 @@ public class InvoiceLineStorageService {

public void createInvoiceLine(InvoiceLine invoiceLine, Handler<AsyncResult<Response>> asyncResultHandler,
Context vertxContext, Map<String, String> headers) {
try {
vertxContext.runOnContext(v -> {
log.info("createInvoiceLine:: Creating a new invoiceLine by id: {}", invoiceLine.getId());
new DBClient(vertxContext, headers).getPgClient()
.withTrans(conn -> invoiceLinesDAO.createInvoiceLine(invoiceLine, conn)
.compose(invoiceLineId -> auditOutboxService.saveInvoiceLineOutboxLog(conn, invoiceLine, InvoiceLineAuditEvent.Action.CREATE, headers)))
.onSuccess(s -> {
log.info("createInvoiceLine:: Successfully created a new invoiceLine by id: {}", invoiceLine.getId());
asyncResultHandler.handle(buildResponseWithLocation(headers.get(OKAPI_URL), INVOICE_LINES_PREFIX + invoiceLine.getId(), invoiceLine));
})
.onFailure(f -> {
log.error("Error occurred while creating a new invoiceLine with id: {}", invoiceLine.getId(), f);
asyncResultHandler.handle(buildErrorResponse(f));
});
log.info("createInvoiceLine:: Creating a new invoiceLine by id: {}", invoiceLine.getId());
new DBClient(vertxContext, headers).getPgClient()
.withTrans(conn -> invoiceLinesDAO.createInvoiceLine(invoiceLine, conn)
.compose(invoiceLineId -> auditOutboxService.saveInvoiceLineOutboxLog(conn, invoiceLine, InvoiceLineAuditEvent.Action.CREATE, headers))
.compose(v -> auditOutboxService.processOutboxEventLogs(headers, vertxContext)))
.onSuccess(s -> {
log.info("createInvoiceLine:: Successfully created a new invoiceLine by id: {}", invoiceLine.getId());
asyncResultHandler.handle(buildResponseWithLocation(headers.get(OKAPI_URL), INVOICE_LINES_PREFIX + invoiceLine.getId(), invoiceLine));
})
.onFailure(f -> {
log.error("Error occurred while creating a new invoiceLine with id: {}", invoiceLine.getId(), f);
asyncResultHandler.handle(buildErrorResponse(f));
});
} catch (Exception e) {
log.error("Error occurred while creating a new invoiceLine with id: {}", invoiceLine.getId(), e);
asyncResultHandler.handle(buildErrorResponse(
new HttpException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())
));
}
}

public void updateInvoiceLine(String id, InvoiceLine invoiceLine, Map<String, String> headers,
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
try {
vertxContext.runOnContext(v -> {
log.info("updateInvoiceLine:: Updating invoice line with id: {}", id);
new DBClient(vertxContext, headers).getPgClient()
.withTrans(conn -> invoiceLinesDAO.updateInvoiceLine(id, invoiceLine, conn)
.compose(invoiceLineId -> auditOutboxService.saveInvoiceLineOutboxLog(conn, invoiceLine, InvoiceLineAuditEvent.Action.EDIT, headers)))
.onSuccess(s -> {
log.info("updateInvoiceLine:: Successfully updated invoice line with id: {}", id);
asyncResultHandler.handle(buildNoContentResponse());
})
.onFailure(f -> {
log.error("Error occurred while updating invoice line with id: {}", id, f);
asyncResultHandler.handle(buildErrorResponse(f));
});
log.info("updateInvoiceLine:: Updating invoice line with id: {}", id);
new DBClient(vertxContext, headers).getPgClient()
.withTrans(conn -> invoiceLinesDAO.updateInvoiceLine(id, invoiceLine, conn)
.compose(invoiceLineId -> auditOutboxService.saveInvoiceLineOutboxLog(conn, invoiceLine, InvoiceLineAuditEvent.Action.EDIT, headers))
.compose(v -> auditOutboxService.processOutboxEventLogs(headers, vertxContext)))
.onSuccess(s -> {
log.info("updateInvoiceLine:: Successfully updated invoice line with id: {}", id);
asyncResultHandler.handle(buildNoContentResponse());
})
.onFailure(f -> {
log.error("Error occurred while updating invoice line with id: {}", id, f);
asyncResultHandler.handle(buildErrorResponse(f));
});
} catch (Exception e) {
log.error("Error occurred while updating invoice line with id: {}", id, e);
asyncResultHandler.handle(buildErrorResponse(
new HttpException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())
));
}
}

}
68 changes: 25 additions & 43 deletions src/main/java/org/folio/service/InvoiceStorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,55 +44,37 @@ public class InvoiceStorageService {
private final AuditOutboxService auditOutboxService;

public void postInvoiceStorageInvoices(Invoice invoice, Handler<AsyncResult<Response>> asyncResultHandler,
Context vertxContext, Map<String, String> headers) {
try {
vertxContext.runOnContext(v -> {
log.info("postInvoiceStorageInvoices:: Creating a new invoice by id: {}", invoice.getId());
new DBClient(vertxContext, headers).getPgClient()
.withTrans(conn -> invoiceDAO.createInvoice(invoice, conn)
.compose(invoiceId -> auditOutboxService.saveInvoiceOutboxLog(conn, invoice, InvoiceAuditEvent.Action.CREATE, headers)))
.onSuccess(s -> {
log.info("postInvoiceStorageInvoices:: Successfully created a new invoice by id: {}", invoice.getId());
asyncResultHandler.handle(buildResponseWithLocation(headers.get(OKAPI_URL), INVOICE_PREFIX + invoice.getId(), invoice));
})
.onFailure(f -> {
log.error("Error occurred while creating a new invoice with id: {}", invoice.getId(), f);
asyncResultHandler.handle(buildErrorResponse(f));
});
Context vertxContext, Map<String, String> headers) {
log.info("postInvoiceStorageInvoices:: Creating a new invoice by id: {}", invoice.getId());
new DBClient(vertxContext, headers).getPgClient()
.withTrans(conn -> invoiceDAO.createInvoice(invoice, conn)
.compose(invoiceId -> auditOutboxService.saveInvoiceOutboxLog(conn, invoice, InvoiceAuditEvent.Action.CREATE, headers))
.compose(v -> auditOutboxService.processOutboxEventLogs(headers, vertxContext)))
.onSuccess(s -> {
log.info("postInvoiceStorageInvoices:: Successfully created a new invoice by id: {}", invoice.getId());
asyncResultHandler.handle(buildResponseWithLocation(headers.get(OKAPI_URL), INVOICE_PREFIX + invoice.getId(), invoice));
})
.onFailure(f -> {
log.error("Error occurred while creating a new invoice with id: {}", invoice.getId(), f);
asyncResultHandler.handle(buildErrorResponse(f));
});
} catch (Exception e) {
log.error("Error occurred while creating a new invoice with id: {}", invoice.getId(), e);
asyncResultHandler.handle(buildErrorResponse(
new HttpException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())
));
}
}

public void putInvoiceStorageInvoicesById(String id, Invoice invoice, Map<String, String> headers,
Handler<AsyncResult<Response>> asyncResultHandler, Context vertxContext) {
try {
vertxContext.runOnContext(v -> {
log.info("putInvoiceStorageInvoicesById:: Updating invoice with id: {}", id);
new DBClient(vertxContext, headers).getPgClient()
.withTrans(conn -> invoiceDAO.updateInvoice(id, invoice, conn)
.compose(invoiceId -> auditOutboxService.saveInvoiceOutboxLog(conn, invoice, InvoiceAuditEvent.Action.EDIT, headers)))
.onSuccess(s -> {
log.info("putInvoiceStorageInvoicesById:: Successfully updated invoice with id: {}", id);
asyncResultHandler.handle(buildNoContentResponse());
})
.onFailure(f -> {
log.error("Error occurred while updating invoice with id: {}", id, f);
asyncResultHandler.handle(buildErrorResponse(f));
});
log.info("putInvoiceStorageInvoicesById:: Updating invoice with id: {}", id);
new DBClient(vertxContext, headers).getPgClient()
.withTrans(conn -> invoiceDAO.updateInvoice(id, invoice, conn)
.compose(invoiceId -> auditOutboxService.saveInvoiceOutboxLog(conn, invoice, InvoiceAuditEvent.Action.EDIT, headers))
.compose(v -> auditOutboxService.processOutboxEventLogs(headers, vertxContext)))
.onSuccess(s -> {
log.info("putInvoiceStorageInvoicesById:: Successfully updated invoice with id: {}", id);
asyncResultHandler.handle(buildNoContentResponse());
})
.onFailure(f -> {
log.error("Error occurred while updating invoice with id: {}", id, f);
asyncResultHandler.handle(buildErrorResponse(f));
});
} catch (Exception e) {
log.error("Error occurred while updating invoice with id: {}", invoice.getId(), e);
asyncResultHandler.handle(buildErrorResponse(
new HttpException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
Response.Status.INTERNAL_SERVER_ERROR.getReasonPhrase())
));
}
}

public void deleteInvoiceStorageInvoicesById(String id, Handler<AsyncResult<Response>> asyncResultHandler,
Expand Down
29 changes: 25 additions & 4 deletions src/test/java/org/folio/rest/impl/StorageTestSuite.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.folio.rest.impl;

import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
import static org.folio.rest.impl.TestBase.TENANT_HEADER;
import static org.folio.rest.utils.TenantApiTestUtil.deleteTenant;
import static org.folio.rest.utils.TenantApiTestUtil.prepareTenant;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.net.URL;
Expand Down Expand Up @@ -36,6 +36,7 @@
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxImpl;
import io.vertx.core.json.JsonObject;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;

public class StorageTestSuite {
private static final Logger log = LogManager.getLogger(StorageTestSuite.class);
Expand All @@ -44,6 +45,14 @@ public class StorageTestSuite {
private static final int port = NetworkUtils.nextFreePort();
public static final Header URL_TO_HEADER = new Header("X-Okapi-Url-to", "http://localhost:" + port);
private static TenantJob tenantJob;
public static EmbeddedKafkaCluster kafkaCluster;
public static final String KAFKA_ENV_VALUE = "test-env";
private static final String KAFKA_HOST = "KAFKA_HOST";
private static final String KAFKA_PORT = "KAFKA_PORT";
private static final String KAFKA_ENV = "ENV";
private static final String OKAPI_URL_KEY = "OKAPI_URL";
public static final int mockPort = NetworkUtils.nextFreePort();


private StorageTestSuite() {}

Expand Down Expand Up @@ -83,16 +92,27 @@ private static Context getContextWithReflection(Verticle verticle) {
}

@BeforeAll
public static void before() throws IOException, InterruptedException, ExecutionException, TimeoutException {
public static void before() throws InterruptedException, ExecutionException, TimeoutException {
// tests expect English error messages only, no Danish/German/...
Locale.setDefault(Locale.US);

vertx = Vertx.vertx();

log.info("Start container database");

log.info("Starting kafka cluster");
kafkaCluster = EmbeddedKafkaCluster.provisionWith(defaultClusterConfig());
kafkaCluster.start();
String[] hostAndPort = kafkaCluster.getBrokerList().split(":");
System.setProperty(KAFKA_HOST, hostAndPort[0]);
System.setProperty(KAFKA_PORT, hostAndPort[1]);
System.setProperty(KAFKA_ENV, KAFKA_ENV_VALUE);
System.setProperty(OKAPI_URL_KEY, "http://localhost:" + mockPort);
log.info("Kafka cluster started with host: {}, port: {}, broker list: {}",
System.getProperty(KAFKA_HOST), System.getProperty(KAFKA_PORT), kafkaCluster.getBrokerList());

log.info("Starting container database");
PostgresClient.setPostgresTester(new PostgresTesterContainer());


DeploymentOptions options = new DeploymentOptions();

options.setConfig(new JsonObject().put("http.port", port));
Expand All @@ -106,6 +126,7 @@ public static void before() throws IOException, InterruptedException, ExecutionE
@AfterAll
public static void after() throws InterruptedException, ExecutionException, TimeoutException {
log.info("Delete tenant");
kafkaCluster.stop();
deleteTenant(tenantJob, TENANT_HEADER);

CompletableFuture<String> undeploymentComplete = new CompletableFuture<>();
Expand Down

0 comments on commit a2244e6

Please sign in to comment.