Skip to content

Commit

Permalink
feat: Kafka trans
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenzo-ingenito committed Sep 24, 2024
1 parent 51e23d7 commit e9d93ec
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.validation.constraints.Size;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -83,7 +84,6 @@
import it.finanze.sanita.fse2.ms.gtw.dispatcher.dto.response.PublicationResDTO;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.dto.response.ResponseWifDTO;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.enums.ActivityEnum;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.enums.DestinationTypeEnum;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.enums.ErrorInstanceEnum;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.enums.EventStatusEnum;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.enums.EventTypeEnum;
Expand Down Expand Up @@ -218,9 +218,11 @@ private void postExecutionCreate(final Date startDateOperation, final LogTraceIn
kafkaValue.setIdDoc(idDoc);
kafkaValue.setEdsDPOperation(ProcessorOperationEnum.PUBLISH);

kafkaSRV.notifyChannel(idDoc, new Gson().toJson(kafkaValue), priorityType, validationInfo.getJsonObj().getTipoDocumentoLivAlto(), DestinationTypeEnum.INDEXER);
kafkaSRV.sendPublicationStatus(traceInfoDTO.getTraceID(), validationInfo.getValidationData().getWorkflowInstanceId(), SUCCESS, null, validationInfo.getJsonObj(), validationInfo.getJwtPayloadToken());


kafkaSRV.notifyIndexerAndStatusInTransaction(idDoc, new Gson().toJson(kafkaValue), priorityType,
validationInfo.getJsonObj().getTipoDocumentoLivAlto(), traceInfoDTO.getTraceID(), validationInfo.getValidationData().getWorkflowInstanceId(),
SUCCESS, null, validationInfo.getJsonObj(), validationInfo.getJwtPayloadToken(),EventTypeEnum.PUBLICATION);

logger.info(Constants.App.LOG_TYPE_CONTROL,validationInfo.getValidationData().getWorkflowInstanceId(),String.format("Publication CDA completed for workflow instance id %s", validationInfo.getValidationData().getWorkflowInstanceId()), OperationLogEnum.PUB_CDA2, ResultLogEnum.OK, startDateOperation, getDocumentType(validationInfo.getDocument()), validationInfo.getJwtPayloadToken(),null);
}

Expand Down Expand Up @@ -256,8 +258,11 @@ public ResponseEntity<PublicationResDTO> replace(final String idDoc, final Publi
kafkaValue.setIdDoc(idDoc);
kafkaValue.setEdsDPOperation(ProcessorOperationEnum.REPLACE);

kafkaSRV.notifyChannel(idDoc, new Gson().toJson(kafkaValue), PriorityTypeEnum.LOW, validationInfo.getJsonObj().getTipoDocumentoLivAlto(), DestinationTypeEnum.INDEXER);
kafkaSRV.sendReplaceStatus(traceInfoDTO.getTraceID(), validationInfo.getValidationData().getWorkflowInstanceId(), SUCCESS, null, validationInfo.getJsonObj(), validationInfo.getJwtPayloadToken());

kafkaSRV.notifyIndexerAndStatusInTransaction(idDoc, new Gson().toJson(kafkaValue), PriorityTypeEnum.LOW,
validationInfo.getJsonObj().getTipoDocumentoLivAlto(), traceInfoDTO.getTraceID(), validationInfo.getValidationData().getWorkflowInstanceId(),
SUCCESS, null, validationInfo.getJsonObj(), validationInfo.getJwtPayloadToken(), EventTypeEnum.REPLACE);


logger.info(Constants.App.LOG_TYPE_CONTROL,validationInfo.getValidationData().getWorkflowInstanceId(),String.format("Replace CDA completed for workflow instance id %s", validationInfo.getValidationData().getWorkflowInstanceId()), OperationLogEnum.REPLACE_CDA2, ResultLogEnum.OK, startDateOperation,
getDocumentType(validationInfo.getDocument()), validationInfo.getJwtPayloadToken(),null);
Expand Down Expand Up @@ -403,7 +408,9 @@ private ValidationCreationInputDTO publicationAndReplaceValidation(final Multipa
try {
simulatedResult = accreditamentoSimulationSRV.runSimulation(idToCheck, bytePDF, isReplace ? EventTypeEnum.REPLACE : EventTypeEnum.PUBLICATION);
} catch(NoRecordFoundException noRecordFound) {
kafkaSRV.sendReplaceStatus(traceInfoDTO.getTraceID(), "", EventStatusEnum.BLOCKING_ERROR, "Id documento non presente", jsonObj, jwtPayloadToken);
// kafkaSRV.sendReplaceStatus(traceInfoDTO.getTraceID(), "", EventStatusEnum.BLOCKING_ERROR, "Id documento non presente", jsonObj, jwtPayloadToken);
ProducerRecord<String,String> status = kafkaSRV.getStatus(traceInfoDTO.getTraceID(), "", EventStatusEnum.BLOCKING_ERROR, "Id documento non presente", jsonObj, jwtPayloadToken, eventTypeEnum);
kafkaSRV.kafkaSendNonInTransaction(status);
throw noRecordFound;
}

Expand Down Expand Up @@ -741,9 +748,10 @@ public ResponseEntity<PublicationResDTO> validateAndReplace(
kafkaValue.setIdDoc(idDoc);
kafkaValue.setEdsDPOperation(ProcessorOperationEnum.REPLACE);

kafkaSRV.notifyChannel(idDoc, new Gson().toJson(kafkaValue), PriorityTypeEnum.LOW, validationResult.getJsonObj().getTipoDocumentoLivAlto(), DestinationTypeEnum.INDEXER);
kafkaSRV.sendReplaceStatus(traceInfoDTO.getTraceID(), validationResult.getValidationData().getWorkflowInstanceId(), SUCCESS, null, validationResult.getJsonObj(), validationResult.getJwtPayloadToken());

kafkaSRV.notifyIndexerAndStatusInTransaction(idDoc, new Gson().toJson(kafkaValue), PriorityTypeEnum.LOW,
validationResult.getJsonObj().getTipoDocumentoLivAlto(),
traceInfoDTO.getTraceID(), validationResult.getValidationData().getWorkflowInstanceId(), SUCCESS, null, validationResult.getJsonObj(), validationResult.getJwtPayloadToken(), EventTypeEnum.REPLACE);

logger.info(Constants.App.LOG_TYPE_CONTROL,validationResult.getValidationData().getWorkflowInstanceId(),String.format("Replace CDA completed for workflow instance id %s", validationResult.getValidationData().getWorkflowInstanceId()), OperationLogEnum.REPLACE_CDA2, ResultLogEnum.OK, startDateReplacenOperation,
getDocumentType(validationResult.getDocument()), validationResult.getJwtPayloadToken(),null);
} catch (ConnectionRefusedException ce) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,42 @@
*/
package it.finanze.sanita.fse2.ms.gtw.dispatcher.service;

import org.apache.kafka.clients.producer.ProducerRecord;

import it.finanze.sanita.fse2.ms.gtw.dispatcher.dto.JWTPayloadDTO;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.dto.request.PublicationCreateReplaceMetadataDTO;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.enums.*;
import org.apache.kafka.clients.producer.RecordMetadata;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.enums.EventStatusEnum;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.enums.EventTypeEnum;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.enums.PriorityTypeEnum;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.enums.TipoDocAltoLivEnum;

/**
* Interface for service used to handle kafka communications
*/
public interface IKafkaSRV {

/**
* Send message over kafka topic
* @param topic
* @param key
* @param value
* @param trans
* @return
*/
RecordMetadata sendMessage(String topic, String key, String value, boolean trans);

/**
* Send message to either indexer or publisher over kafka topic, choosing priority
* @param key
* @param value
* @param priorityType
* @param documentType
* @param destinationTypeEnum
*/
void notifyChannel(String key, String value, PriorityTypeEnum priorityType, TipoDocAltoLivEnum documentType, DestinationTypeEnum destinationTypeEnum);

void kafkaSendNonInTransaction(ProducerRecord<String, String> producerRecord);
void sendValidationStatus(String traceId, String workflowInstanceId, EventStatusEnum eventStatus, String message,JWTPayloadDTO jwtClaimDTO);
void sendValidationStatus(String traceId,String workflowInstanceId, EventStatusEnum eventStatus, String message,
JWTPayloadDTO jwtClaimDTO, EventTypeEnum eventTypeEnum);
void sendPublicationStatus(String traceId, String workflowInstanceId, EventStatusEnum eventStatus, String message, PublicationCreateReplaceMetadataDTO req, JWTPayloadDTO jwtClaimDTO);

void sendReplaceStatus(String traceId, String workflowInstanceId, EventStatusEnum eventStatus, String message, PublicationCreateReplaceMetadataDTO req, JWTPayloadDTO jwtClaimDTO);

ProducerRecord<String, String> getStatus(String traceId, String workflowInstanceId,
EventStatusEnum eventStatus, String message,
PublicationCreateReplaceMetadataDTO publicationReq, JWTPayloadDTO jwtClaimDTO,
EventTypeEnum eventTypeEnum);

void sendDeleteStatus(String traceId, String workflowInstanceId, String idDoc, String message, EventStatusEnum eventStatus, JWTPayloadDTO jwt, EventTypeEnum eventType);
void sendDeleteRequest(String workflowInstanceId, Object request);

void sendUpdateStatus(String traceId, String workflowInstanceId, String idDoc, EventStatusEnum eventStatus, JWTPayloadDTO jwt, String message,EventTypeEnum event);

void sendUpdateRequest(String workflowInstanceId, Object request);

void notifyIndexerAndStatusInTransaction(final String key, final String kafkaValue, PriorityTypeEnum priorityFromRequest,
TipoDocAltoLivEnum documentType,
String traceId, String workflowInstanceId,
EventStatusEnum eventStatus, String message,
PublicationCreateReplaceMetadataDTO publicationReq, JWTPayloadDTO jwtClaimDTO,
EventTypeEnum eventTypeEnum);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import it.finanze.sanita.fse2.ms.gtw.dispatcher.service.IErrorHandlerSRV;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.service.IKafkaSRV;
import it.finanze.sanita.fse2.ms.gtw.dispatcher.utility.StringUtility;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

Expand Down Expand Up @@ -59,8 +61,10 @@ public void connectionRefusedExceptionHandler(

EventStatusEnum errorEventStatus = RestExecutionResultEnum.GENERIC_TIMEOUT.getEventStatusEnum();

kafkaSRV.sendPublicationStatus(traceInfoDTO.getTraceID(), validationInfo.getWorkflowInstanceId(), errorEventStatus,
errorMessage, jsonObj, jwtPayloadToken);
EventTypeEnum eventType = isPublication ? EventTypeEnum.PUBLICATION : EventTypeEnum.REPLACE;
ProducerRecord<String, String> status = kafkaSRV.getStatus(traceInfoDTO.getTraceID(), validationInfo.getWorkflowInstanceId(), errorEventStatus,
errorMessage, jsonObj, jwtPayloadToken, eventType);
kafkaSRV.kafkaSendNonInTransaction(status);

final RestExecutionResultEnum errorType = RestExecutionResultEnum.get(capturedErrorType);

Expand Down Expand Up @@ -96,14 +100,11 @@ public void publicationValidationExceptionHandler(
errorEventStatus = RestExecutionResultEnum.get(capturedErrorType).getEventStatusEnum();
}

if(isPublication) {
kafkaSRV.sendPublicationStatus(traceInfoDTO.getTraceID(), validationInfo.getWorkflowInstanceId(), errorEventStatus,
errorMessage, jsonObj, jwtPayloadToken);
} else {
kafkaSRV.sendReplaceStatus(traceInfoDTO.getTraceID(), validationInfo.getWorkflowInstanceId(), errorEventStatus,
errorMessage, jsonObj, jwtPayloadToken);
}

EventTypeEnum eventType = isPublication ? EventTypeEnum.PUBLICATION : EventTypeEnum.REPLACE;
ProducerRecord<String, String> status = kafkaSRV.getStatus(traceInfoDTO.getTraceID(), validationInfo.getWorkflowInstanceId(), errorEventStatus,
errorMessage, jsonObj, jwtPayloadToken, eventType);
kafkaSRV.kafkaSendNonInTransaction(status);

final RestExecutionResultEnum errorType = RestExecutionResultEnum.get(capturedErrorType);

Expand Down
Loading

0 comments on commit e9d93ec

Please sign in to comment.