Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumption side changes #313

Merged
merged 10 commits into from
Dec 31, 2024
2 changes: 1 addition & 1 deletion src/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ DEFAULT_DATA_MANAGERS="program_manager,program_designer"
DEFAULT_ROLLOUT_ROLES='rollout_manager'

#kafka topic for publishing rollout
ROLLOUT_PUBLISH_KAFKA_TOPIC='dev.rolloutpublishtopic'
ROLLOUT_PUBLISH_KAFKA_TOPIC='dev.rolloutpublish'

#API for create the project template in consumption
PROJECT_PUBLISH_END_POINT=v1/scp/publishTemplateAndTasks
Expand Down
10 changes: 8 additions & 2 deletions src/configs/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ module.exports = async () => {
const subscribeToConsumer = async () => {
try {
await consumer.subscribe({
topics: [process.env.CLEAR_INTERNAL_CACHE, process.env.PROJECT_PUBLISH_KAFKA_TOPIC],
topics: [
process.env.CLEAR_INTERNAL_CACHE,
process.env.PROJECT_PUBLISH_KAFKA_TOPIC,
process.env.ROLLOUT_PUBLISH_KAFKA_TOPIC,
],
})
logger.info(
`Subscribed to topics: ${process.env.CLEAR_INTERNAL_CACHE} and ${process.env.PROJECT_PUBLISH_KAFKA_TOPIC}`
`Subscribed to topics: ${process.env.CLEAR_INTERNAL_CACHE} , ${process.env.PROJECT_PUBLISH_KAFKA_TOPIC} and ${process.env.ROLLOUT_PUBLISH_KAFKA_TOPIC}`
)
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
Expand All @@ -75,6 +79,8 @@ module.exports = async () => {
utils.internalDel(streamingData)
} else if (topic == process.env.PROJECT_PUBLISH_KAFKA_TOPIC) {
await consumptionService.publishProjectTemplates(streamingData)
} else if (topic == process.env.ROLLOUT_PUBLISH_KAFKA_TOPIC) {
await consumptionService.publishProgram(streamingData)
}
} catch (error) {
logger.error('Error processing Kafka message:', { error })
Expand Down
7 changes: 6 additions & 1 deletion src/envVariables.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,12 @@ let environmentVariables = {
ROLLOUT_PUBLISH_KAFKA_TOPIC: {
message: 'Default Kafka topic for rollout publish required',
adithyadinesh0412 marked this conversation as resolved.
Show resolved Hide resolved
optional: true,
default: 'dev.rolloutpublishtopic',
default: 'dev.rolloutpublish',
requiredIf: {
key: 'CONSUMPTION_SERVICE_BASE_URL',
operator: 'NOT_EQUALS',
value: 'self',
},
},
CONSUMPTION_SERVICE_BASE_URL: {
message: 'Consumption service base name required',
Expand Down
20 changes: 20 additions & 0 deletions src/generics/kafka-communication.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,28 @@ const pushResourceToKafka = async (message, resourceType) => {
throw error
}
}
const pushRolloutToKafka = async (message, resourceType) => {
try {
let topic = process.env.ROLLOUT_PUBLISH_KAFKA_TOPIC

if (!topic) {
console.log('Publishing rollout topic not fount.')
return
}

const payload = {
topic: topic,
messages: [{ value: JSON.stringify(message) }],
}

return await pushPayloadToKafka(payload)
} catch (error) {
throw error
}
}

module.exports = {
clearInternalCache,
pushResourceToKafka,
pushRolloutToKafka,
}
16 changes: 16 additions & 0 deletions src/generics/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,21 @@ function convertDuration(durationObj) {
}
}

/**
* Converts charachters which can cause issues in xml file to accepted values
* name escapeXml
* @param {String} inputElement - The input duration object.
* @returns {String} - Converted xml with accepted charecters.
*/
const escapeXml = (inputElement) => {
adithyadinesh0412 marked this conversation as resolved.
Show resolved Hide resolved
return inputElement
.replace(/&/g, '&')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;')
.replace(/'/g, '&apos;')
}

module.exports = {
composeEmailBody,
internalSet,
Expand Down Expand Up @@ -761,4 +776,5 @@ module.exports = {
formatKeywords,
formatProjectMetaInformation,
convertDuration,
escapeXml,
}
Loading