Skip to content

Commit

Permalink
Merge pull request #31 from krgauraw/migr-job
Browse files Browse the repository at this point in the history
IQ-476: code changes for quml migration
  • Loading branch information
sajeshkayyath authored Aug 4, 2023
2 parents b26d45e + ecbe952 commit 84156eb
Show file tree
Hide file tree
Showing 36 changed files with 2,104 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,10 @@ class ObjectDefinition(val objectType: String, val version: String, val schema:
val oneOfProps: List[String] = if (config.contains("oneOfProps")) config.getOrElse("oneOfProps", List()).asInstanceOf[List[String]] else List()
oneOfProps
}
def getPropsType(propNames: List[String]): Map[String, String] = {
if (schema.isEmpty || propNames.isEmpty) Map() else {
val properties = schema.getOrElse("properties", Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
properties.filter(prop => propNames.contains(prop._1)).map(prop => (prop._1, prop._2.asInstanceOf[Map[String, AnyRef]].getOrElse("type", "").asInstanceOf[String]))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.sunbird.spec

import org.apache.commons.lang.StringUtils
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.scalatestplus.mockito.MockitoSugar
import org.sunbird.job.domain.`object`.ObjectDefinition

class ObjectDefinitionSpec extends FlatSpec with BeforeAndAfterAll with Matchers with MockitoSugar {

val schema = Map[String, AnyRef]("properties" -> Map[String, AnyRef]("instructions" -> Map("type"-> "object"), "outcomeDeclaration"-> Map("type"->"object")))
val config = Map[String, AnyRef]("oneOfProps"-> List("body","answer"), "external" -> Map("tableName" -> "questionset_hierarchy", "properties" -> Map("hierarchy"->Map("type" -> "string"), "instructions"->Map("type" -> "string"), "outcomeDeclaration"->Map("type" -> "string")), "primaryKey" -> List("identifier")))

val qSchema = Map[String, AnyRef]("properties" -> Map[String, AnyRef]("instructions" -> Map("type" -> "object"), "outcomeDeclaration" -> Map("type" -> "object")))

val qConfig = Map[String, AnyRef]("oneOfProps" -> List("body", "answer"), "external" -> Map("tableName" -> "question_data", "properties" -> Map("body" -> Map("type" -> "string"), "instructions" -> Map("type" -> "string"), "outcomeDeclaration" -> Map("type" -> "string")), "primaryKey" -> List("identifier")))

"getPropsType" should "return property and its type in map format " in {
val objDef = new ObjectDefinition("QuestionSet","1.0", schema, config)
val output = objDef.getPropsType(List("instructions", "outcomeDeclaration"))
assert(output.nonEmpty)
assert(output.size==2)
assert(StringUtils.equalsIgnoreCase(output.getOrElse("instructions", "").asInstanceOf[String],"object"))
assert(StringUtils.equalsIgnoreCase(output.getOrElse("outcomeDeclaration", "").asInstanceOf[String],"object"))
}

"getOneOfProps" should "return list of props having oneOf type" in {
val objDef = new ObjectDefinition("Question", "1.0", qSchema, qConfig)
val output = objDef.getOneOfProps()
assert(output.nonEmpty)
assert(output.size == 2)
assert(output.contains("body"))
assert(output.contains("answer"))
}

"getJsonProps" should "return list of props having object or array type" in {
val objDef = new ObjectDefinition("Question", "1.0", qSchema, qConfig)
val output = objDef.getJsonProps()
assert(output.nonEmpty)
assert(output.size == 2)
assert(output.contains("instructions"))
assert(output.contains("outcomeDeclaration"))
}

"getExternalTable" should "return table name" in {
val objDef = new ObjectDefinition("Question", "1.0", qSchema, qConfig)
val output: String = objDef.getExternalTable()
assert(output.nonEmpty)
assert(StringUtils.equalsIgnoreCase("question_data", output))
}

"getExternalProps" should "return external property and its type in map format" in {
val objDef = new ObjectDefinition("Question", "1.0", qSchema, qConfig)
val output = objDef.getExternalProps()
assert(output.nonEmpty)
assert(output.size == 3)
assert(output.contains("body"))
assert(output.contains("instructions"))
assert(output.contains("outcomeDeclaration"))
}

"getExternalTable" should "primary key of external table" in {
val objDef = new ObjectDefinition("Question", "1.0", qSchema, qConfig)
val output = objDef.getExternalPrimaryKey()
assert(output.nonEmpty)
assert(output.size==1)
assert(output.contains("identifier"))
}
}
6 changes: 6 additions & 0 deletions jobs-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
<version>1.0.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.sunbird</groupId>
<artifactId>quml-migrator</artifactId>
<version>1.0.0</version>
<type>jar</type>
</dependency>
</dependencies>

<build>
Expand Down
11 changes: 11 additions & 0 deletions kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ flink_job_names:
taskmanager_memory: 4096m
taskslots: 1
cpu_requests: 0.3
quml-migrator:
job_class_name: 'org.sunbird.job.quml.migrator.task.QumlMigratorStreamTask'
replica: 1
jobmanager_memory: 2048m
taskmanager_memory: 2048m
taskslots: 1
cpu_requests: 0.3

### Global vars
cloud_store: azure
Expand Down Expand Up @@ -81,3 +88,7 @@ valid_cloudstorage_base_urls: '["https://sunbirddevbbpublic.blob.core.windows.ne
### QuestionSet RePublish Job Config
inquiry_assessment_republish_kafka_topic_name: "{{ env_name }}.assessment.republish.request"
inquiry_assessment_republish_group: "{{ env_name }}-questionset-republish-group"

### quml-migrator config
inquiry_quml_migrator_kafka_topic_name: "{{ env_name }}.quml.migration.job.request"
inquiry_quml_migrator_group: "{{ env_name }}-quml-migrator-group"
30 changes: 30 additions & 0 deletions kubernetes/helm_charts/datapipeline_jobs/values.j2
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,34 @@ questionset-republish:
taskmanager.numberOfTaskSlots: {{ flink_job_names['questionset-republish'].taskslots }}
parallelism.default: 1
jobmanager.execution.failover-strategy: region
taskmanager.memory.network.fraction: 0.1

quml-migrator:
quml-migrator: |+
include file("/data/flink/conf/base-config.conf")
kafka {
input.topic = "{{ inquiry_quml_migrator_kafka_topic_name }}"
republish.topic = "{{ inquiry_assessment_republish_kafka_topic_name }}"
groupId = "{{ inquiry_quml_migrator_group }}"
}
task {
consumer.parallelism = 1
parallelism = 1
router.parallelism = 1
}
question {
keyspace = "{{ question_keyspace_name }}"
table = "question_data"
}
questionset {
keyspace = "{{ hierarchy_keyspace_name }}"
table = "questionset_hierarchy"
}

flink-conf: |+
jobmanager.memory.flink.size: {{ flink_job_names['quml-migrator'].jobmanager_memory }}
taskmanager.memory.flink.size: {{ flink_job_names['quml-migrator'].taskmanager_memory }}
taskmanager.numberOfTaskSlots: {{ flink_job_names['quml-migrator'].taskslots }}
parallelism.default: 1
jobmanager.execution.failover-strategy: region
taskmanager.memory.network.fraction: 0.1
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
<modules>
<module>jobs-core</module>
<module>publish-pipeline</module>
</modules>
<module>quml-migrator</module>
</modules>

<properties>
<!-- maven specific properties -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import scala.collection.JavaConverters._

class Event(eventMap: java.util.Map[String, Any], partition: Int, offset: Long) extends JobRequest(eventMap, partition, offset) {

private val jobName = "questionset-publish"
private val jobName = "questionset-republish"

private val objectTypes = List("Question","QuestionSet")
private val mimeTypes = List("application/vnd.sunbird.question", "application/vnd.sunbird.questionset")
Expand All @@ -32,6 +32,8 @@ class Event(eventMap: java.util.Map[String, Any], partition: Int, offset: Long)
pkgVersion.toDouble
}

def schemaVersion: String = readOrDefault[String]("edata.metadata.schemaVersion", "1.0")

def validEvent(): Boolean = {
(StringUtils.equals("republish", action) && StringUtils.isNotBlank(objectId)) && (objectTypes.contains(objectType) && mimeTypes.contains(mimeType) && pkgVersion > 0)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package org.sunbird.job.questionset.republish.domain

case class PublishMetadata(identifier: String, objectType: String, mimeType: String, pkgVersion: Double, publishType: String, lastPublishedBy: String)
case class PublishMetadata(identifier: String, objectType: String, mimeType: String, pkgVersion: Double, publishType: String, lastPublishedBy: String, schemaVersion: String = "1.0")
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ class PublishEventRouter(config: QuestionSetRePublishConfig) extends BaseProcess
event.objectType match {
case "Question" => {
logger.info("PublishEventRouter :: Sending Question For RePublish Having Identifier: " + event.objectId)
context.output(config.questionRePublishOutTag, PublishMetadata(event.objectId, event.objectType, event.mimeType, event.pkgVersion, event.publishType, event.lastPublishedBy))
context.output(config.questionRePublishOutTag, PublishMetadata(event.objectId, event.objectType, event.mimeType, event.pkgVersion, event.publishType, event.lastPublishedBy, event.schemaVersion))
}
case "QuestionSet" => {
logger.info("PublishEventRouter :: Sending QuestionSet For RePublish Having Identifier: " + event.objectId)
context.output(config.questionSetRePublishOutTag, PublishMetadata(event.objectId, event.objectType, event.mimeType, event.pkgVersion, event.publishType, event.lastPublishedBy))
context.output(config.questionSetRePublishOutTag, PublishMetadata(event.objectId, event.objectType, event.mimeType, event.pkgVersion, event.publishType, event.lastPublishedBy, event.schemaVersion))
}
case _ => {
metrics.incCounter(config.skippedEventCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.slf4j.LoggerFactory
import org.sunbird.job.domain.`object`.DefinitionCache
import org.sunbird.job.domain.`object`.{DefinitionCache, ObjectDefinition}
import org.sunbird.job.publish.core.{DefinitionConfig, ExtDataConfig, ObjectData}
import org.sunbird.job.publish.helpers.EcarPackageType
import org.sunbird.job.questionset.republish.domain.PublishMetadata
import org.sunbird.job.questionset.republish.helpers.QuestionPublisher
import org.sunbird.job.questionset.republish.task.QuestionSetRePublishConfig
import org.sunbird.job.util.{CassandraUtil, CloudStorageUtil, HttpUtil, Neo4JUtil}
import org.sunbird.job.{BaseProcessFunction, Metrics}

import java.lang.reflect.Type
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.commons.lang3.StringUtils
import org.sunbird.job.cache.{DataCache, RedisConnect}

Expand All @@ -34,7 +34,6 @@ class QuestionRePublishFunction(config: QuestionSetRePublishConfig, httpUtil: Ht

private[this] val logger = LoggerFactory.getLogger(classOf[QuestionRePublishFunction])
val mapType: Type = new TypeToken[java.util.Map[String, AnyRef]]() {}.getType
private val readerConfig = ExtDataConfig(config.questionKeyspaceName, config.questionTableName)

@transient var ec: ExecutionContext = _
@transient var cache: DataCache = _
Expand Down Expand Up @@ -65,6 +64,8 @@ class QuestionRePublishFunction(config: QuestionSetRePublishConfig, httpUtil: Ht
override def processElement(data: PublishMetadata, context: ProcessFunction[PublishMetadata, String]#Context, metrics: Metrics): Unit = {
logger.info("Question publishing started for : " + data.identifier)
metrics.incCounter(config.questionRePublishEventCount)
val definition: ObjectDefinition = definitionCache.getDefinition(data.objectType, data.schemaVersion, config.definitionBasePath)
val readerConfig = ExtDataConfig(config.questionKeyspaceName, definition.getExternalTable, definition.getExternalPrimaryKey, definition.getExternalProps)
val obj = getObject(data.identifier, data.pkgVersion, data.mimeType, data.publishType, readerConfig)(neo4JUtil, cassandraUtil,config)
try {
val messages: List[String] = validate(obj, obj.identifier, validateQuestion)
Expand All @@ -79,7 +80,7 @@ class QuestionRePublishFunction(config: QuestionSetRePublishConfig, httpUtil: Ht
logger.info("Question publishing completed successfully for : " + data.identifier)
} else {
val upPkgVersion = obj.pkgVersion + 1
val migrVer = 0.2
val migrVer = 2.2
val nodeId = obj.dbId
val errorMessages = messages.mkString("; ")
val query = s"""MATCH (n:domain{IL_UNIQUE_ID:"$nodeId"}) SET n.status="Failed", n.pkgVersion=$upPkgVersion, n.publishError="$errorMessages", n.migrationVersion=$migrVer, $auditPropsUpdateQuery;"""
Expand All @@ -90,7 +91,7 @@ class QuestionRePublishFunction(config: QuestionSetRePublishConfig, httpUtil: Ht
} catch {
case e: Exception => {
val upPkgVersion = obj.pkgVersion + 1
val migrVer = 0.2
val migrVer = 2.2
val nodeId = obj.dbId
val errorMessages = e.getLocalizedMessage
val query = s"""MATCH (n:domain{IL_UNIQUE_ID:"$nodeId"}) SET n.status="Failed", n.pkgVersion=$upPkgVersion, n.publishError="$errorMessages", n.migrationVersion=$migrVer, $auditPropsUpdateQuery;"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ class QuestionSetRePublishFunction(config: QuestionSetRePublishConfig, httpUtil:
override def processElement(data: PublishMetadata, context: ProcessFunction[PublishMetadata, String]#Context, metrics: Metrics): Unit = {
logger.info("QuestionSet publishing started for : " + data.identifier)
metrics.incCounter(config.questionSetRePublishEventCount)
val definition: ObjectDefinition = definitionCache.getDefinition(data.objectType, config.schemaSupportVersionMap.getOrElse(data.objectType.toLowerCase(), "1.0").asInstanceOf[String], config.definitionBasePath)
val definition: ObjectDefinition = definitionCache.getDefinition(data.objectType, data.schemaVersion, config.definitionBasePath)
val readerConfig = ExtDataConfig(config.questionSetKeyspaceName, config.questionSetTableName, definition.getExternalPrimaryKey, definition.getExternalProps)
val qDef: ObjectDefinition = definitionCache.getDefinition("Question", config.schemaSupportVersionMap.getOrElse("question", "1.0").asInstanceOf[String], config.definitionBasePath)
val qDef: ObjectDefinition = definitionCache.getDefinition("Question", data.schemaVersion, config.definitionBasePath)
val qReaderConfig = ExtDataConfig(config.questionKeyspaceName, qDef.getExternalTable, qDef.getExternalPrimaryKey, qDef.getExternalProps)
val obj = getObject(data.identifier, data.pkgVersion, data.mimeType, data.publishType, readerConfig)(neo4JUtil, cassandraUtil, config)
logger.info("processElement ::: obj metadata before publish ::: " + ScalaJsonUtil.serialize(obj.metadata))
Expand Down Expand Up @@ -99,7 +99,7 @@ class QuestionSetRePublishFunction(config: QuestionSetRePublishConfig, httpUtil:
metrics.incCounter(config.questionSetRePublishSuccessEventCount)
} else {
val upPkgVersion = obj.pkgVersion + 1
val migrVer = 0.2
val migrVer = 2.2
val nodeId = obj.dbId
val errorMessages = messages.mkString("; ")
val query = s"""MATCH (n:domain{IL_UNIQUE_ID:"$nodeId"}) SET n.status="Failed", n.pkgVersion=$upPkgVersion, n.publishError="$errorMessages", n.migrationVersion=$migrVer, $auditPropsUpdateQuery;"""
Expand All @@ -110,7 +110,7 @@ class QuestionSetRePublishFunction(config: QuestionSetRePublishConfig, httpUtil:
} catch {
case e: Exception => {
val upPkgVersion = obj.pkgVersion + 1
val migrVer = 0.2
val migrVer = 2.2
val nodeId = obj.dbId
val query = s"""MATCH (n:domain{IL_UNIQUE_ID:"$nodeId"}) SET n.status="Failed", n.pkgVersion=$upPkgVersion, n.publishError="${e.getMessage}", n.migrationVersion=$migrVer, $auditPropsUpdateQuery;"""
neo4JUtil.executeQuery(query)
Expand Down
Loading

0 comments on commit 84156eb

Please sign in to comment.