diff --git a/pom.xml b/pom.xml index 7f849e84..e7aeb96c 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,7 @@ 4.5.10 + 3.6.1 2.24.1 @@ -183,6 +184,11 @@ org.apache.logging.log4j log4j-slf4j-impl + + org.apache.kafka + kafka-clients + ${kafkaclients.version} + net.mguenther.kafka kafka-junit diff --git a/src/main/java/org/folio/dao/invoice/InvoicePostgresDAO.java b/src/main/java/org/folio/dao/invoice/InvoicePostgresDAO.java index e9dfc226..df134422 100644 --- a/src/main/java/org/folio/dao/invoice/InvoicePostgresDAO.java +++ b/src/main/java/org/folio/dao/invoice/InvoicePostgresDAO.java @@ -61,8 +61,8 @@ public Future createInvoice(Invoice invoice, Conn conn) { } return conn.save(INVOICE_TABLE, invoice.getId(), invoice, true) .recover(t -> Future.failedFuture(convertPgExceptionIfNeeded(t))) - .onFailure(t -> log.error("createInvoice failed for invoice with id {}", invoice.getId(), t)) - .onSuccess(s -> log.info("createInvoice:: New invoice with id: '{}' successfully created", invoice.getId())); + .onSuccess(s -> log.info("createInvoice:: New invoice with id: '{}' successfully created", invoice.getId())) + .onFailure(t -> log.error("Failed to create invoice with id: '{}'", invoice.getId(), t)); } @Override @@ -70,7 +70,7 @@ public Future updateInvoice(String id, Invoice invoice, Conn conn) { return conn.update(INVOICE_TABLE, invoice, id) .compose(DbUtils::verifyEntityUpdate) .onSuccess(v -> log.info("updateInvoice:: Invoice with id: '{}' successfully updated", invoice.getId())) - .onFailure(t -> log.error("Update failed for invoice with id {}", invoice.getId(), t)) + .onFailure(t -> log.error("Update failed for invoice with id: '{}'", invoice.getId(), t)) .mapEmpty(); } diff --git a/src/main/java/org/folio/dao/lines/InvoiceLinesPostgresDAO.java b/src/main/java/org/folio/dao/lines/InvoiceLinesPostgresDAO.java index 3afbfead..8fc84aa6 100644 --- a/src/main/java/org/folio/dao/lines/InvoiceLinesPostgresDAO.java +++ b/src/main/java/org/folio/dao/lines/InvoiceLinesPostgresDAO.java @@ -32,7 +32,7 @@ public Future createInvoiceLine(InvoiceLine invoiceLine, Conn conn) { return conn.save(INVOICE_LINE_TABLE, invoiceLine.getId(), invoiceLine, true) .recover(t -> Future.failedFuture(convertPgExceptionIfNeeded(t))) .onSuccess(invoiceLineId -> log.info("createInvoiceLine:: Created invoice line with id: {}", invoiceLineId)) - .onFailure(t -> log.error("Failed to create invoice line: {}", invoiceLine, t)); + .onFailure(t -> log.error("Failed to create invoice line with id: {}", invoiceLine.getId(), t)); } @Override diff --git a/src/main/java/org/folio/rest/utils/ResponseUtils.java b/src/main/java/org/folio/rest/utils/ResponseUtils.java index fdedf4b1..a59b97e7 100644 --- a/src/main/java/org/folio/rest/utils/ResponseUtils.java +++ b/src/main/java/org/folio/rest/utils/ResponseUtils.java @@ -4,6 +4,7 @@ import static javax.ws.rs.core.HttpHeaders.LOCATION; import static javax.ws.rs.core.MediaType.APPLICATION_JSON; import static javax.ws.rs.core.MediaType.TEXT_PLAIN; +import static javax.ws.rs.core.Response.Status.BAD_REQUEST; import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import java.net.URI; @@ -52,7 +53,7 @@ public static void handleFailure(Promise promise, AsyncResult reply) { public static Throwable convertPgExceptionIfNeeded(Throwable cause) { var badRequestMessage = PgExceptionUtil.badRequestMessage(cause); if (badRequestMessage != null) { - return new HttpException(Response.Status.BAD_REQUEST.getStatusCode(), badRequestMessage); + return new HttpException(BAD_REQUEST.getStatusCode(), badRequestMessage); } else { return new HttpException(INTERNAL_SERVER_ERROR.getStatusCode(), cause.getMessage()); } @@ -85,6 +86,10 @@ public static Future buildOkResponse(Object body) { return Future.succeededFuture(Response.ok(body, APPLICATION_JSON).build()); } + public static Future buildBadRequestResponse(String body) { + return Future.succeededFuture(buildErrorResponse(BAD_REQUEST.getStatusCode(), body)); + } + public static Future buildErrorResponse(Throwable throwable) { final String message; final int code; diff --git a/src/main/java/org/folio/service/InvoiceLineStorageService.java b/src/main/java/org/folio/service/InvoiceLineStorageService.java index 3c5a0a58..1dc44bc6 100644 --- a/src/main/java/org/folio/service/InvoiceLineStorageService.java +++ b/src/main/java/org/folio/service/InvoiceLineStorageService.java @@ -1,6 +1,7 @@ package org.folio.service; import static org.folio.rest.impl.InvoiceStorageImpl.INVOICE_LINES_PREFIX; +import static org.folio.rest.utils.ResponseUtils.buildBadRequestResponse; import static org.folio.rest.utils.ResponseUtils.buildErrorResponse; import static org.folio.rest.utils.ResponseUtils.buildNoContentResponse; import static org.folio.rest.utils.ResponseUtils.buildResponseWithLocation; @@ -9,6 +10,7 @@ import javax.ws.rs.core.Response; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.folio.dao.lines.InvoiceLinesDAO; import org.folio.rest.jaxrs.model.InvoiceLine; import org.folio.rest.jaxrs.model.InvoiceLineAuditEvent; @@ -18,7 +20,6 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Handler; -import io.vertx.ext.web.handler.HttpException; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -49,6 +50,9 @@ public void createInvoiceLine(InvoiceLine invoiceLine, Handler headers, Handler> asyncResultHandler, Context vertxContext) { log.info("updateInvoiceLine:: Updating invoice line with id: {}", id); + if (StringUtils.isBlank(id)) { + asyncResultHandler.handle(buildBadRequestResponse("Invoice line id is required")); + } new DBClient(vertxContext, headers).getPgClient() .withTrans(conn -> invoiceLinesDAO.updateInvoiceLine(id, invoiceLine, conn) .compose(invoiceLineId -> auditOutboxService.saveInvoiceLineOutboxLog(conn, invoiceLine, InvoiceLineAuditEvent.Action.EDIT, headers)) diff --git a/src/main/java/org/folio/service/InvoiceStorageService.java b/src/main/java/org/folio/service/InvoiceStorageService.java index 43422603..7f6a89cf 100644 --- a/src/main/java/org/folio/service/InvoiceStorageService.java +++ b/src/main/java/org/folio/service/InvoiceStorageService.java @@ -5,6 +5,7 @@ import static org.folio.rest.impl.InvoiceStorageImpl.INVOICE_ID_FIELD_NAME; import static org.folio.rest.impl.InvoiceStorageImpl.INVOICE_PREFIX; import static org.folio.rest.utils.HelperUtils.combineCqlExpressions; +import static org.folio.rest.utils.ResponseUtils.buildBadRequestResponse; import static org.folio.rest.utils.ResponseUtils.buildOkResponse; import static org.folio.rest.utils.ResponseUtils.buildErrorResponse; import static org.folio.rest.utils.ResponseUtils.buildNoContentResponse; @@ -63,6 +64,9 @@ public void postInvoiceStorageInvoices(Invoice invoice, Handler headers, Handler> asyncResultHandler, Context vertxContext) { log.info("putInvoiceStorageInvoicesById:: Updating invoice with id: {}", id); + if (StringUtils.isBlank(id)) { + asyncResultHandler.handle(buildBadRequestResponse("Invoice id is required")); + } new DBClient(vertxContext, headers).getPgClient() .withTrans(conn -> invoiceDAO.updateInvoice(id, invoice, conn) .compose(invoiceId -> auditOutboxService.saveInvoiceOutboxLog(conn, invoice, InvoiceAuditEvent.Action.EDIT, headers)) diff --git a/src/main/java/org/folio/service/audit/AuditEventProducer.java b/src/main/java/org/folio/service/audit/AuditEventProducer.java index fb94ef54..33d401a0 100644 --- a/src/main/java/org/folio/service/audit/AuditEventProducer.java +++ b/src/main/java/org/folio/service/audit/AuditEventProducer.java @@ -57,7 +57,7 @@ public Future sendInvoiceLineEvent(InvoiceLine invoiceLine, InvoiceLineAud var event = getAuditEvent(invoiceLine, eventAction); log.info("sendInvoiceLineEvent:: Sending event with id: {} and invoiceLineId: {} to Kafka", event.getId(), invoiceLine.getId()); return sendToKafka(EventTopic.ACQ_INVOICE_LINE_CHANGED, event.getInvoiceLineId(), event, okapiHeaders) - .onFailure(t -> log.warn("sendInvoiceLineEvent:: Failed to send event with id: {} and invoiceLineId: {} to Kafka", event.getId(), invoiceLine.getId(), t)); + .onFailure(t -> log.error("sendInvoiceLineEvent:: Failed to send event with id: {} and invoiceLineId: {} to Kafka", event.getId(), invoiceLine.getId(), t)); } private InvoiceAuditEvent getAuditEvent(Invoice invoice, InvoiceAuditEvent.Action eventAction) { @@ -94,11 +94,10 @@ private Future sendToKafka(EventTopic eventTopic, String key, Object event var producerManager = new SimpleKafkaProducerManager(Vertx.currentContext().owner(), kafkaConfig); KafkaProducer producer = producerManager.createShared(topicName); - log.debug("sendToKafka:: Sending event for {} with id '{}' to kafka topic '{}' with url: '{}'", eventTopic, key, topicName, kafkaConfig.getKafkaUrl()); return producer.send(kafkaProducerRecord) - .onComplete(reply -> producer.end(ear -> producer.close())) .onSuccess(s -> log.info("sendToKafka:: Event for {} with id '{}' has been sent to kafka topic '{}'", eventTopic, key, topicName)) .onFailure(t -> log.error("Failed to send event for {} with id '{}' to kafka topic '{}'", eventTopic, key, topicName, t)) + .onComplete(reply -> producer.end(v -> producer.close())) .mapEmpty(); } diff --git a/src/test/java/org/folio/rest/impl/StorageTestSuite.java b/src/test/java/org/folio/rest/impl/StorageTestSuite.java index bed59f4d..1f3bdc3e 100644 --- a/src/test/java/org/folio/rest/impl/StorageTestSuite.java +++ b/src/test/java/org/folio/rest/impl/StorageTestSuite.java @@ -111,12 +111,9 @@ public static void before() throws InterruptedException, ExecutionException, Tim log.info("Starting container database"); PostgresClient.setPostgresTester(new PostgresTesterContainer()); - DeploymentOptions options = new DeploymentOptions(); - options.setConfig(new JsonObject().put("http.port", port)); options.setWorker(true); - startVerticle(options); tenantJob = prepareTenant(TENANT_HEADER, false, false);