diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardUtil.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardUtil.scala index 3214a0ef..1253613d 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardUtil.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardUtil.scala @@ -35,132 +35,131 @@ case class DummyInput(timestamp: Long) extends AlgoInput // no input, there are case class DummyOutput() extends Output with AlgoOutput // no output as we take care of kafka dispatches ourself case class DashboardConfig ( - debug: String, - validation: String, - // kafka connection config - broker: String, - compression: String, - // redis connection config - redisHost: String, - redisPort: Int, - redisDB: Int, - // for blob storage - store: String, - container: String, - key: String, - secret: String, - // other hosts connection config - sparkCassandraConnectionHost: String, sparkDruidRouterHost: String, - sparkElasticsearchConnectionHost: String, fracBackendHost: String, - sparkMongoConnectionHost: String, mlSparkDruidRouterHost: String, - mlSparkMongoConnectionHost: String, - // kafka topics - roleUserCountTopic: String, orgRoleUserCountTopic: String, - allCourseTopic: String, userCourseProgramProgressTopic: String, - fracCompetencyTopic: String, courseCompetencyTopic: String, expectedCompetencyTopic: String, - declaredCompetencyTopic: String, competencyGapTopic: String, userOrgTopic: String, orgTopic: String, - userAssessmentTopic: String, assessmentTopic: String, acbpEnrolmentTopic: String, - // cassandra key spaces - cassandraUserKeyspace: String, - cassandraCourseKeyspace: String, cassandraHierarchyStoreKeyspace: String, - cassandraUserFeedKeyspace: String, - // cassandra table details - cassandraUserTable: String, cassandraUserRolesTable: String, cassandraOrgTable: String, - cassandraUserEnrolmentsTable: String, cassandraContentHierarchyTable: String, - cassandraRatingSummaryTable: String, cassandraUserAssessmentTable: String, - cassandraRatingsTable: String, cassandraOrgHierarchyTable: String, - cassandraUserFeedTable: String, cassandraAcbpTable: String, - cassandraCourseBatchTable: String, - cassandraLearnerStatsTable: String, - cassandraKarmaPointsTable: String, - cassandraHallOfFameTable: String, - cassandraKarmaPointsLookupTable: String, - cassandraKarmaPointsSummaryTable: String, - cassandraLearnerLeaderBoardTable: String, - cassandraLearnerLeaderBoardLookupTable: String, - - //warehouse tables; - appPostgresHost: String, - appPostgresSchema: String, - appPostgresUsername: String, - appPostgresCredential: String, - appOrgHierarchyTable: String, - dwPostgresHost: String, - dwPostgresSchema: String, - dwPostgresUsername: String, - dwPostgresCredential: String, - dwUserTable: String, - dwCourseTable: String, - dwEnrollmentsTable: String, - dwOrgTable: String, - dwAssessmentTable: String, - dwBPEnrollmentsTable: String, - dwKcmDictionaryTable: String, - dwKcmContentTable: String, - dwCBPlanTable: String, - postgresCompetencyTable: String, - postgresCompetencyHierarchyTable: String, - - // redis keys - redisRegisteredOfficerCountKey: String, redisTotalOfficerCountKey: String, redisOrgNameKey: String, - redisTotalRegisteredOfficerCountKey: String, redisTotalOrgCountKey: String, - redisExpectedUserCompetencyCount: String, redisDeclaredUserCompetencyCount: String, - redisUserCompetencyDeclarationRate: String, redisOrgCompetencyDeclarationRate: String, - redisUserCompetencyGapCount: String, redisUserCourseEnrolmentCount: String, - redisUserCompetencyGapEnrolmentRate: String, redisOrgCompetencyGapEnrolmentRate: String, - redisUserCourseCompletionCount: String, redisUserCompetencyGapClosedCount: String, - redisUserCompetencyGapClosedRate: String, redisOrgCompetencyGapClosedRate: String, - - // mongoDB configurations - mongoDBCollection: String, - mongoDatabase: String, - platformRatingSurveyId: String, - - // for reports - mdoIDs: String, - localReportDir: String, - standaloneAssessmentReportPath: String, - userReportPath: String, - userEnrolmentReportPath: String, - courseReportPath: String, - taggedUsersPath: String, - cbaReportPath: String, - blendedReportPath: String, - orgHierarchyReportPath: String, - commsConsoleReportPath: String, - acbpReportPath: String, - acbpMdoEnrolmentReportPath: String, - acbpMdoSummaryReportPath: String, - kcmReportPath: String, - - commsConsolePrarambhEmailSuffix: String, - commsConsoleNumDaysToConsider: Int, - commsConsoleNumTopLearnersToConsider: Int, - commsConsolePrarambhTags: String, - commsConsolePrarambhNCount: Int, - commsConsolePrarambhCbpIds: String, - - //ml report config - gracePeriod: String, - solutionIDs: String, - baseUrlForEvidences: String, - mlMongoDatabase: String, - surveyCollection: String, - mlReportPath: String, - surveyQuestionReportColumnsConfig: String, - surveyStatusReportColumnsConfig: String, - includeExpiredSolutionIDs: Boolean, - - - prefixDirectoryPath: String, - destinationDirectoryPath: String, - directoriesToSelect: String, - password: String, - // for weekly claps - cutoffTime: Float, - // to enable/disable report sync - reportSyncEnable: Boolean - ) extends Serializable + debug: String, + validation: String, + // kafka connection config + broker: String, + compression: String, + // redis connection config + redisHost: String, + redisPort: Int, + redisDB: Int, + // for blob storage + store: String, + container: String, + key: String, + secret: String, + // other hosts connection config + sparkCassandraConnectionHost: String, sparkDruidRouterHost: String, + sparkElasticsearchConnectionHost: String, fracBackendHost: String, + sparkMongoConnectionHost: String, mlSparkDruidRouterHost: String, + mlSparkMongoConnectionHost: String, + // kafka topics + roleUserCountTopic: String, orgRoleUserCountTopic: String, + allCourseTopic: String, userCourseProgramProgressTopic: String, + fracCompetencyTopic: String, courseCompetencyTopic: String, expectedCompetencyTopic: String, + declaredCompetencyTopic: String, competencyGapTopic: String, userOrgTopic: String, orgTopic: String, + userAssessmentTopic: String, assessmentTopic: String, acbpEnrolmentTopic: String, + // cassandra key spaces + cassandraUserKeyspace: String, + cassandraCourseKeyspace: String, cassandraHierarchyStoreKeyspace: String, + cassandraUserFeedKeyspace: String, + // cassandra table details + cassandraUserTable: String, cassandraUserRolesTable: String, cassandraOrgTable: String, + cassandraUserEnrolmentsTable: String, cassandraContentHierarchyTable: String, + cassandraRatingSummaryTable: String, cassandraUserAssessmentTable: String, + cassandraRatingsTable: String, cassandraOrgHierarchyTable: String, + cassandraUserFeedTable: String, cassandraAcbpTable: String, + cassandraCourseBatchTable: String, + cassandraLearnerStatsTable: String, + cassandraKarmaPointsTable: String, + cassandraHallOfFameTable: String, + cassandraKarmaPointsLookupTable: String, + cassandraKarmaPointsSummaryTable: String, + cassandraLearnerLeaderBoardTable: String, + cassandraLearnerLeaderBoardLookupTable: String, + + //warehouse tables; + appPostgresHost: String, + appPostgresSchema: String, + appPostgresUsername: String, + appPostgresCredential: String, + appOrgHierarchyTable: String, + dwPostgresHost: String, + dwPostgresSchema: String, + dwPostgresUsername: String, + dwPostgresCredential: String, + dwUserTable: String, + dwCourseTable: String, + dwEnrollmentsTable: String, + dwOrgTable: String, + dwAssessmentTable: String, + dwBPEnrollmentsTable: String, + dwKcmDictionaryTable: String, + dwKcmContentTable: String, + dwCBPlanTable: String, + postgresCompetencyTable: String, + postgresCompetencyHierarchyTable: String, + + // redis keys + redisRegisteredOfficerCountKey: String, redisTotalOfficerCountKey: String, redisOrgNameKey: String, + redisTotalRegisteredOfficerCountKey: String, redisTotalOrgCountKey: String, + redisExpectedUserCompetencyCount: String, redisDeclaredUserCompetencyCount: String, + redisUserCompetencyDeclarationRate: String, redisOrgCompetencyDeclarationRate: String, + redisUserCompetencyGapCount: String, redisUserCourseEnrolmentCount: String, + redisUserCompetencyGapEnrolmentRate: String, redisOrgCompetencyGapEnrolmentRate: String, + redisUserCourseCompletionCount: String, redisUserCompetencyGapClosedCount: String, + redisUserCompetencyGapClosedRate: String, redisOrgCompetencyGapClosedRate: String, + + // mongoDB configurations + mongoDBCollection: String, + mongoDatabase: String, + platformRatingSurveyId: String, + + // for reports + mdoIDs: String, + localReportDir: String, + standaloneAssessmentReportPath: String, + userReportPath: String, + userEnrolmentReportPath: String, + courseReportPath: String, + taggedUsersPath: String, + cbaReportPath: String, + blendedReportPath: String, + orgHierarchyReportPath: String, + commsConsoleReportPath: String, + acbpReportPath: String, + acbpMdoEnrolmentReportPath: String, + acbpMdoSummaryReportPath: String, + kcmReportPath: String, + + commsConsolePrarambhEmailSuffix: String, + commsConsoleNumDaysToConsider: Int, + commsConsoleNumTopLearnersToConsider: Int, + commsConsolePrarambhTags: String, + commsConsolePrarambhNCount: Int, + commsConsolePrarambhCbpIds: String, + + //ml report config + gracePeriod: String, + solutionIDs: String, + baseUrlForEvidences: String, + mlMongoDatabase: String, + surveyCollection: String, + reportConfigCollection: String, + mlReportPath: String, + includeExpiredSolutionIDs: Boolean, + + + prefixDirectoryPath: String, + destinationDirectoryPath: String, + directoriesToSelect: String, + password: String, + // for weekly claps + cutoffTime: Float, + // to enable/disable report sync + reportSyncEnable: Boolean + ) extends Serializable object DashboardConfigParser extends Serializable { /* Config functions */ @@ -309,10 +308,10 @@ object DashboardConfigParser extends Serializable { baseUrlForEvidences = getConfigModelParam(config, "baseUrlForEvidences"), mlMongoDatabase = getConfigModelParam(config, "mlMongoDatabase"), surveyCollection = getConfigModelParam(config, "surveyCollection"), + reportConfigCollection = getConfigModelParam(config, "reportConfigCollection"), mlReportPath = getConfigModelParam(config, "mlReportPath"), - surveyQuestionReportColumnsConfig = getConfigModelParam(config, "surveyQuestionReportColumnsConfig"), - surveyStatusReportColumnsConfig = getConfigModelParam(config, "surveyStatusReportColumnsConfig"), - includeExpiredSolutionIDs = getConfigModelParam(config, "includeExpiredSolutionIDs").toBoolean, + includeExpiredSolutionIDs = getConfigModelParam(config, "includeExpiredSolutionIDs", "true").toBoolean, + // comms-console commsConsolePrarambhEmailSuffix = getConfigModelParam(config, "commsConsolePrarambhEmailSuffix", ".kb@karmayogi.in"), @@ -802,6 +801,22 @@ object DashboardUtil extends Serializable { filteredDf } + def mongodbReportConfigAsString(url: String, mongoDatabase: String, collection: String, filter: String)(implicit spark: SparkSession): String = { + val schema = new StructType() + .add("report", StringType, true) + .add("config", StringType, true) + val df = spark.read.schema(schema) + .format("com.mongodb.spark.sql.DefaultSource") + .option("uri", url) + .option("database", mongoDatabase) + .option("collection", collection) + .load() + val filteredDf = df.filter(col("report") === filter) + val configAsString = filteredDf.select("config").first().getString(0) + println(s"Report config for $filter \n " + configAsString) + configAsString + } + def writeToCassandra(data: DataFrame, keyspace: String, table: String)(implicit spark: SparkSession): Unit = { data.write.format("org.apache.spark.sql.cassandra") .mode("append") @@ -1205,4 +1220,4 @@ object Redis extends Serializable { dispatch(redisKey, df.toMap[T](keyField, valueField), replace) } -} \ No newline at end of file +} diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala index 84d64bd3..61e22b22 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala @@ -280,19 +280,14 @@ object DataUtil extends Serializable { StructField("endDate", DateType, nullable = true) )) - val surveyStatusCompletedDataSchema: StructType = StructType(Seq( - StructField("completed_at", StringType, nullable = true), - StructField("survey_submission_id", DateType, nullable = true) + val observationStatusCompletedDataSchema: StructType = StructType(Seq( + StructField("completedAt", StringType, nullable = true), + StructField("observationSubmissionId", StringType, nullable = true) )) - val surveyStatusInProgressDataSchema: StructType = StructType(Seq( - StructField("inprogress_at", StringType, nullable = true), - StructField("survey_submission_id", DateType, nullable = true) - )) - - val surveyStatusStartedDataSchema: StructType = StructType(Seq( - StructField("started_at", StringType, nullable = true), - StructField("survey_submission_id", DateType, nullable = true) + val observationStatusInProgressDataSchema: StructType = StructType(Seq( + StructField("inprogressAt", StringType, nullable = true), + StructField("observationSubmissionId", StringType, nullable = true) )) } @@ -1012,7 +1007,7 @@ object DataUtil extends Serializable { def userCourseProgramCompletionDataFrame(extraCols: Seq[String] = Seq(), datesAsLong: Boolean = false)(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { val selectCols = Seq("userID", "courseID", "batchID", "courseProgress", "dbCompletionStatus", "courseCompletedTimestamp", - "courseEnrolledTimestamp", "lastContentAccessTimestamp", "issuedCertificateCount", "certificateGeneratedOn", "certificateID") ++ extraCols + "courseEnrolledTimestamp", "lastContentAccessTimestamp", "issuedCertificateCount", "firstCompletedOn", "certificateGeneratedOn", "certificateID") ++ extraCols var df = cache.load("enrolment") .where(expr("active=true")) @@ -1021,6 +1016,7 @@ object DataUtil extends Serializable { .withColumn("lastContentAccessTimestamp", col("lastcontentaccesstime")) .withColumn("issuedCertificateCount", size(col("issued_certificates"))) .withColumn("certificateGeneratedOn", when(col("issued_certificates").isNull, "").otherwise( col("issued_certificates")(0).getItem("lastIssuedOn"))) + .withColumn("firstCompletedOn", when(col("issued_certificates").isNull, "").otherwise(when(size(col("issued_certificates")) > 0, col("issued_certificates")(size(col("issued_certificates")) - 1).getItem("lastIssuedOn")).otherwise(""))) .withColumn("certificateID", when(col("issued_certificates").isNull, "").otherwise( col("issued_certificates")(0).getItem("identifier"))) .withColumnRenamed("userid", "userID") .withColumnRenamed("courseid", "courseID") @@ -1517,6 +1513,15 @@ object DataUtil extends Serializable { df } + /** + * Reading karma points details + */ + def userKarmaPointsSummaryDataFrame()(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): DataFrame = { + val df = cache.load("userKarmaPointsSummary") + show(df, "Karma Points Summary data") + df + } + /** * Reading user_karma_points data */ @@ -1661,11 +1666,11 @@ object DataUtil extends Serializable { syncReports(reportTempPath, reportPath) } - def learnerLeaderBoardDataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { - val df = cache.load("learnerLeaderBoard") - show(df, "learnerLeaderBoard") - df - } + def learnerLeaderBoardDataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { + val df = cache.load("learnerLeaderBoard") + show(df, "learnerLeaderBoard") + df + } def getSolutionIdsAsDF(solutionIds: String)(implicit spark: SparkSession, sc: SparkContext): DataFrame = { val mdoIDs = solutionIds.split(",").map(_.toString).distinct @@ -1682,7 +1687,17 @@ object DataUtil extends Serializable { var df = druidDFOption(query, conf.mlSparkDruidRouterHost, limit = 1000000).orNull if (df == null) return emptySchemaDataFrame(Schema.solutionIdDataSchema) if (df.columns.contains("evidences")) { - df = df.withColumn("evidences", when(col("evidences").isNotNull && col("evidences") =!= "", concat(lit(conf.baseUrlForEvidences), col("evidences"))).otherwise(col("evidences"))) + val baseUrl = conf.baseUrlForEvidences + val addBaseUrl = udf((evidences: String) => { + if (evidences != null && evidences.trim.nonEmpty) { + evidences.split(", ") + .map(url => s"$baseUrl$url") + .mkString(",") + } else { + evidences + } + }) + df = df.withColumn("evidences", addBaseUrl(col("evidences"))) } df } @@ -1714,14 +1729,29 @@ object DataUtil extends Serializable { df } + def getReportConfig(filter: String)(implicit spark: SparkSession, conf: DashboardConfig): String = { + val completeUrl = s"mongodb://${conf.mlSparkMongoConnectionHost}:27017" + val reportConfig = mongodbReportConfigAsString(completeUrl, conf.mlMongoDatabase, conf.reportConfigCollection, filter) + reportConfig + } + def zipAndSyncReports(completePath: String, reportPath: String)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { val folder = new File(completePath) val zipFilePath = completePath + ".zip" + /** Delete the existing .zip file if it exists */ + val reportName = completePath.split("/").last + val existingZipFile = new File(completePath + s"/$reportName.zip") + if (existingZipFile.exists()) existingZipFile.delete() + /** Delete .crc files */ + val crcFiles = folder.listFiles.filter(_.getName.endsWith(".crc")) + crcFiles.foreach(_.delete()) + /** Zip the folder */ + val zipFile = new ZipFile(zipFilePath) val parameters = new ZipParameters() parameters.setCompressionMethod(CompressionMethod.DEFLATE) parameters.setCompressionLevel(CompressionLevel.NORMAL) - /** Zip the folder */ + zipFile.addFolder(folder, parameters) /** Delete all files inside parent directory */ if (folder.isDirectory) FileUtils.cleanDirectory(folder) @@ -1734,4 +1764,34 @@ object DataUtil extends Serializable { syncReports(completePath, reportPath) } + def getObservationStatusCompletedData(solutionDf: DataFrame)(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { + val modifiedSolutionDf = solutionDf + .withColumn("Status of Submission", lit(null).cast(StringType)) + .withColumn("Submission Date", lit(null).cast(StringType)) + val query = """SELECT completedAt, observationSubmissionId FROM \"sl-observation-status-completed\" """ + val statusCompletedQueryDf = druidDFOption(query, conf.mlSparkDruidRouterHost, limit = 1000000).orNull + if (statusCompletedQueryDf == null) return emptySchemaDataFrame(Schema.observationStatusCompletedDataSchema) + statusCompletedQueryDf.dropDuplicates() + + val statusCompletedJoinDf = modifiedSolutionDf.join(statusCompletedQueryDf, modifiedSolutionDf("Observation Submission Id") === statusCompletedQueryDf("observationSubmissionId"), "left") + val statusCompletedFinalDf = statusCompletedJoinDf + .withColumn("Status of Submission", when(col("observationSubmissionId").isNotNull, lit("Completed")).otherwise(col("Status of Submission"))) + .withColumn("Submission Date", when(col("observationSubmissionId").isNotNull, col("completedAt")).otherwise(col("Submission Date"))) + .drop("completedAt", "observationSubmissionId") + statusCompletedFinalDf + } + + def getObservationStatusInProgressData(solutionDf: DataFrame)(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { + val query = """SELECT observationSubmissionId FROM \"sl-observation-status-inprogress\" """ + val statusInProgressQueryDf = druidDFOption(query, conf.mlSparkDruidRouterHost, limit = 1000000).orNull + if (statusInProgressQueryDf == null) return emptySchemaDataFrame(Schema.observationStatusInProgressDataSchema) + statusInProgressQueryDf.dropDuplicates() + + val statusInProgressJoinDf = solutionDf.join(statusInProgressQueryDf, solutionDf("Observation Submission Id") === statusInProgressQueryDf("observationSubmissionId"), "left") + val statusInProgressFinalDf = statusInProgressJoinDf + .withColumn("Status of Submission", when(col("observationSubmissionId").isNotNull, lit("In Progress")).otherwise(col("Status of Submission"))) + .drop("observationSubmissionId") + statusInProgressFinalDf + } + } diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/TestUtil.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/TestUtil.scala index 883b0052..0559095d 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/TestUtil.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/TestUtil.scala @@ -56,11 +56,10 @@ object TestUtil extends Serializable { "solutionIDs" -> "", "mlMongoDatabase" -> "ml-survey", "surveyCollection" -> "solutions", + "reportConfigCollection" -> "dataProductConfigurations", "gracePeriod" -> "2", "baseUrlForEvidences" -> "www.https://igotkarmayogi.gov.in/", "mlReportPath" -> "standalone-reports/ml-report", - "surveyQuestionReportColumnsConfig" -> """{"reportColumns":{"createdBy":"UUID","createdAt":"Start Date","updatedAt":"Updated Date","completedDate":"Completed Date","organisationName":"Organisation Name","organisationId":"Organisation Id","surveyName":"Survey Name","surveyId":"Survey Id","surveySubmissionId":"Survey Submission Id","criteriaExternalId":"Criteria External Id","criteriaId":"Criteria Id","criteriaName":"Criteria Name","evidenceCount":"Evidence Count","isAPrivateProgram":"Private Program","questionExternalId":"Question External Id","questionName":"Question","questionResponseLabel":"Answer","questionECM":"Question ECM","questionId":"Question Id","questionResponseType":"Question Response Type","solutionExternalId":"Solution External Id","solutionId":"Solution Id","solutionName":"Solution Name","totalEvidences":"Total Evidences"},"userProfileColumns":{"firstName":"First Name"},"sortingColumns":"UUID,First Name,Start Date,Updated Date,Completed Date,Organisation Id,Organisation Name,Survey Id,Survey Name,Survey Submission Id,Criteria External Id,Criteria Id,Criteria Name,Private Program,Question External Id,Question Id,Question,Answer,Question ECM,Question Response Type,Evidence Count,Solution External Id,Solution Id,Solution Name,Total Evidences"}""", - "surveyStatusReportColumnsConfig" -> """{"reportColumns":{"createdBy":"UUID","completedDate":"Submission Date","createdAt":"Survey Started Date","organisationName":"Organisation Name","solutionExternalId":"Solution External Id","solutionId":"Solution Id","solutionName":"Solution Name","surveyName":"Survey Name","surveyId":"Survey Id","surveySubmissionId":"Survey Submission Id","isAPrivateProgram":"Private Program"},"userProfileColumns":{"firstName":"First Name"},"sortingColumns":"UUID,First Name,Organisation Name,Solution External Id,Solution Id,Solution Name,Survey Id,Survey Name,Survey Submission Id,Private Program,Survey Started Date,Submission Date"}""", "includeExpiredSolutionIDs" -> "true", "mlSparkDruidRouterHost" -> "192.168.3.91", "mlSparkMongoConnectionHost" -> "192.168.3.178", diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/exhaust/DataExhaustModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/exhaust/DataExhaustModel.scala index 316d6955..665907c5 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/exhaust/DataExhaustModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/exhaust/DataExhaustModel.scala @@ -107,8 +107,9 @@ object DataExhaustModel extends AbsDashboardModel { val learnerLeaderboardDF = cassandraTableAsDataFrame(conf.cassandraUserKeyspace, conf.cassandraLearnerLeaderBoardTable) show(learnerLeaderboardDF, "learnerLeaderboardDF") cache.write(learnerLeaderboardDF, "learnerLeaderBoard") - } - - + val userKarmaPointsSummaryDF = cassandraTableAsDataFrame(conf.cassandraUserKeyspace, conf.cassandraKarmaPointsSummaryTable) + show(userKarmaPointsSummaryDF, "userKarmaPointsSummaryDF") + cache.write(userKarmaPointsSummaryDF, "userKarmaPointsSummary") + } } diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/acbp/UserACBPReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/acbp/UserACBPReportModel.scala index 1cae8282..3b8c33d2 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/acbp/UserACBPReportModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/acbp/UserACBPReportModel.scala @@ -40,7 +40,7 @@ object UserACBPReportModel extends AbsDashboardModel { // get ACBP details data frame val acbpDF = acbpDetailsDF() - // CustomUser + // CustomUser val acbpCustomUserAllotmentDF = acbpDF .filter(col("assignmentType") === "CustomUser") .withColumn("userID", explode(col("assignmentTypeInfo"))) @@ -86,8 +86,10 @@ object UserACBPReportModel extends AbsDashboardModel { col("assignmentType").alias("allotment_type"), col("allotment_to"), col("courseID").alias("content_id"), - col("allocatedOn").alias("allocated_on"), - col("completionDueDate").alias("due_by"), + date_format(col("allocatedOn"), "yyyy-MM-dd HH:mm:ss").alias("allocated_on"), + date_format(col("completionDueDate"), "yyyy-MM-dd").alias("due_by"), + //col("allocatedOn").alias("allocated_on"), + //col("completionDueDate").alias("due_by"), col("acbpStatus").alias("status")) .distinct().orderBy("org_id","created_by","cbPlanName") show(cbPlanWarehouseDF, "cbPlanWarehouseDF") diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/blended/BlendedProgramReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/blended/BlendedProgramReportModel.scala index 1d23f6e0..129794c2 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/blended/BlendedProgramReportModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/blended/BlendedProgramReportModel.scala @@ -278,6 +278,7 @@ object BlendedProgramReportModel extends AbsDashboardModel { show(cbpReportDF, "cbpReportDF") generateAndSyncReports(cbpReportDF, "mdoid", reportPathCBP, "BlendedProgramReport") + val df_warehouse = fullDF .withColumn("data_last_generated_on", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss a")) .select( @@ -311,14 +312,14 @@ object BlendedProgramReportModel extends AbsDashboardModel { def bpBatchDataFrame()(implicit spark: SparkSession, conf: DashboardConfig): (DataFrame, DataFrame) = { val batchDF = courseBatchDataFrame() var bpBatchDF = batchDF.select( - col("courseID").alias("bpID"), - col("batchID").alias("bpBatchID"), - col("courseBatchCreatedBy").alias("bpBatchCreatedBy"), - col("courseBatchName").alias("bpBatchName"), - col("courseBatchStartDate").alias("bpBatchStartDate"), - col("courseBatchEndDate").alias("bpBatchEndDate"), - col("courseBatchAttrs").alias("bpBatchAttrs") - ) + col("courseID").alias("bpID"), + col("batchID").alias("bpBatchID"), + col("courseBatchCreatedBy").alias("bpBatchCreatedBy"), + col("courseBatchName").alias("bpBatchName"), + col("courseBatchStartDate").alias("bpBatchStartDate"), + col("courseBatchEndDate").alias("bpBatchEndDate"), + col("courseBatchAttrs").alias("bpBatchAttrs") + ) .withColumn("bpBatchAttrs", from_json(col("bpBatchAttrs"), Schema.batchAttrsSchema)) .withColumn("bpBatchLocation", col("bpBatchAttrs.batchLocationDetails")) .withColumn("bpBatchCurrentSize", col("bpBatchAttrs.currentBatchSize")) @@ -379,3 +380,4 @@ object BlendedProgramReportModel extends AbsDashboardModel { bpChildDF } } + diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/cba/CourseBasedAssessmentModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/cba/CourseBasedAssessmentModel.scala index 344d4d3f..69af6102 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/cba/CourseBasedAssessmentModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/cba/CourseBasedAssessmentModel.scala @@ -129,7 +129,8 @@ object CourseBasedAssessmentModel extends AbsDashboardModel { col("assessment_type").alias("assessment_type"), col("totalAssessmentDuration").alias("assessment_duration"), col("totalAssessmentDuration").alias("time_spent_by_the_user"), - from_unixtime(col("assessEndTime"), "dd/MM/yyyy").alias("completion_date"), + //from_unixtime(col("assessEndTime"), "dd/MM/yyyy").alias("completion_date"), + date_format(from_unixtime(col("assessEndTime")), "yyyy-MM-dd HH:mm:ss").alias("completion_date"), col("assessOverallResult").alias("score_achieved"), col("assessMaxQuestions").alias("overall_score"), col("assessPercentage").alias("cut_off_percentage"), @@ -143,4 +144,4 @@ object CourseBasedAssessmentModel extends AbsDashboardModel { Redis.closeRedisConnect() } -} \ No newline at end of file +} diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/commsconsole/CommsReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/commsconsole/CommsReportModel.scala index 4fcd7660..0c7239c8 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/commsconsole/CommsReportModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/commsconsole/CommsReportModel.scala @@ -35,13 +35,15 @@ object CommsReportModel extends AbsDashboardModel { val userDF = spark.read.option("header", "true") .csv(s"${conf.localReportDir}/${conf.userReportPath}/${today}-warehouse") - .withColumn("registrationDate", to_date(col("user_registration_date"), dateFormat1)) + //.withColumn("registrationDate", to_date(col("user_registration_date"), dateFormat1)) + .withColumn("registrationDate", date_format(col("user_registration_date"), "dd/MM/yyyy HH:mm:ss a")) .select("user_id", "mdo_id", "full_name", "email", "phone_number", "roles", "registrationDate", "tag", "user_registration_date") .join(orgDF, Seq("mdo_id"), "left") val rawEnrollmentsDF = spark.read.option("header", "true") .csv(s"${conf.localReportDir}/${conf.userEnrolmentReportPath}/${today}-warehouse") - .withColumn("completionDate", to_date(col("completed_on"), dateFormat2)) + //.withColumn("completionDate", to_date(col("completed_on"), dateFormat2)) + .withColumn("completionDate", date_format(col("completed_on"), "dd/MM/yyyy HH:mm:ss a")) val enrollmentsDF = rawEnrollmentsDF .join(userDF, Seq("user_id"), "left") diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/course/CourseReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/course/CourseReportModel.scala index a346de28..a6b5f473 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/course/CourseReportModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/course/CourseReportModel.scala @@ -125,7 +125,7 @@ object CourseReportModel extends AbsDashboardModel { col("courseBatchEndDate").alias("batch_end_date"), col("courseDuration").alias("content_duration"), col("rating").alias("content_rating"), - col("courseLastPublishedOn").alias("last_published_on"), + date_format(col("courseLastPublishedOn"), "yyyy-MM-dd").alias("last_published_on"), col("ArchivedOn").alias("content_retired_on"), col("courseStatus").alias("content_status"), col("courseResourceCount").alias("resource_count"), @@ -139,3 +139,5 @@ object CourseReportModel extends AbsDashboardModel { } } + + diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/enrolment/UserEnrolmentModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/enrolment/UserEnrolmentModel.scala index fd1ca903..cf9e9122 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/enrolment/UserEnrolmentModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/enrolment/UserEnrolmentModel.scala @@ -56,8 +56,8 @@ object UserEnrolmentModel extends AbsDashboardModel { var df = allCourseProgramCompletionWithDetailsDFWithRating .durationFormat("courseDuration") - .withColumn("completedOn", to_date(col("courseCompletedTimestamp"), "dd/MM/yyyy")) - .withColumn("enrolledOn", to_date(col("courseEnrolledTimestamp"), "dd/MM/yyyy")) + .withColumn("completedOn", date_format(col("courseCompletedTimestamp"), "yyyy-MM-dd HH:mm:ss")) + .withColumn("enrolledOn", date_format(col("courseEnrolledTimestamp"), "yyyy-MM-dd HH:mm:ss")) .withColumn("courseLastPublishedOn", to_date(col("courseLastPublishedOn"), "dd/MM/yyyy")) .withColumn("courseBatchStartDate", to_date(col("courseBatchStartDate"), "dd/MM/yyyy")) .withColumn("courseBatchEndDate", to_date(col("courseBatchEndDate"), "dd/MM/yyyy")) @@ -81,49 +81,49 @@ object UserEnrolmentModel extends AbsDashboardModel { .withColumn("live_cbp_plan_mandate", when(col("liveCBPlan").isNull, false).otherwise(col("liveCBPlan"))) val fullReportDF = df.select( - col("userID"), - col("userOrgID"), - col("courseID"), - col("courseOrgID"), - col("fullName").alias("Full_Name"), - col("professionalDetails.designation").alias("Designation"), - col("personalDetails.primaryEmail").alias("Email"), - col("personalDetails.mobile").alias("Phone_Number"), - col("professionalDetails.group").alias("Group"), - col("Tag"), - col("ministry_name").alias("Ministry"), - col("dept_name").alias("Department"), - col("userOrgName").alias("Organization"), - col("courseOrgName").alias("Content_Provider"), - col("courseName").alias("Content_Name"), - col("category").alias("Content_Type"), - col("courseDuration").alias("Content_Duration"), - col("batchID").alias("Batch_Id"), - col("courseBatchName").alias("Batch_Name"), - col("courseBatchStartDate").alias("Batch_Start_Date"), - col("courseBatchEndDate").alias("Batch_End_Date"), - col("enrolledOn").alias("Enrolled_On"), - col("userCourseCompletionStatus").alias("Status"), - col("completionPercentage").alias("Content_Progress_Percentage"), - col("courseLastPublishedOn").alias("Last_Published_On"), - col("ArchivedOn").alias("Content_Retired_On"), - col("completedOn").alias("Completed_On"), - col("Certificate_Generated"), - col("userRating").alias("User_Rating"), - col("personalDetails.gender").alias("Gender"), - col("personalDetails.category").alias("Category"), - col("additionalProperties.externalSystem").alias("External_System"), - col("additionalProperties.externalSystemId").alias("External_System_Id"), - col("userOrgID").alias("mdoid"), - col("issuedCertificateCount"), - col("courseStatus"), - col("courseResourceCount").alias("resourceCount"), - col("courseProgress").alias("resourcesConsumed"), - round(expr("CASE WHEN courseResourceCount=0 THEN 0.0 ELSE 100.0 * courseProgress / courseResourceCount END"), 2).alias("rawCompletionPercentage"), - col("Certificate_ID"), - col("Report_Last_Generated_On"), - col("live_cbp_plan_mandate").alias("Live_CBP_Plan_Mandate") - ) + col("userID"), + col("userOrgID"), + col("courseID"), + col("courseOrgID"), + col("fullName").alias("Full_Name"), + col("professionalDetails.designation").alias("Designation"), + col("personalDetails.primaryEmail").alias("Email"), + col("personalDetails.mobile").alias("Phone_Number"), + col("professionalDetails.group").alias("Group"), + col("Tag"), + col("ministry_name").alias("Ministry"), + col("dept_name").alias("Department"), + col("userOrgName").alias("Organization"), + col("courseOrgName").alias("Content_Provider"), + col("courseName").alias("Content_Name"), + col("category").alias("Content_Type"), + col("courseDuration").alias("Content_Duration"), + col("batchID").alias("Batch_Id"), + col("courseBatchName").alias("Batch_Name"), + col("courseBatchStartDate").alias("Batch_Start_Date"), + col("courseBatchEndDate").alias("Batch_End_Date"), + col("enrolledOn").alias("Enrolled_On"), + col("userCourseCompletionStatus").alias("Status"), + col("completionPercentage").alias("Content_Progress_Percentage"), + col("courseLastPublishedOn").alias("Last_Published_On"), + col("ArchivedOn").alias("Content_Retired_On"), + col("completedOn").alias("Completed_On"), + col("Certificate_Generated"), + col("userRating").alias("User_Rating"), + col("personalDetails.gender").alias("Gender"), + col("personalDetails.category").alias("Category"), + col("additionalProperties.externalSystem").alias("External_System"), + col("additionalProperties.externalSystemId").alias("External_System_Id"), + col("userOrgID").alias("mdoid"), + col("issuedCertificateCount"), + col("courseStatus"), + col("courseResourceCount").alias("resourceCount"), + col("courseProgress").alias("resourcesConsumed"), + round(expr("CASE WHEN courseResourceCount=0 THEN 0.0 ELSE 100.0 * courseProgress / courseResourceCount END"), 2).alias("rawCompletionPercentage"), + col("Certificate_ID"), + col("Report_Last_Generated_On"), + col("live_cbp_plan_mandate").alias("Live_CBP_Plan_Mandate") + ) .coalesce(1) show(df, "df") @@ -139,8 +139,8 @@ object UserEnrolmentModel extends AbsDashboardModel { } val warehouseDF = df - .withColumn("certificate_generated_on",to_date(from_utc_timestamp(to_utc_timestamp(to_timestamp( - col("certificateGeneratedOn"), "yyyy-MM-dd'T'HH:mm:ss.SSS"), "UTC"), "IST"), "yyyy-MM-dd")) + .withColumn("certificate_generated_on",date_format(from_utc_timestamp(to_utc_timestamp(to_timestamp( + col("certificateGeneratedOn"), "yyyy-MM-dd'T'HH:mm:ss.SSS"), "UTC"), "IST"), "yyyy-MM-dd HH:mm:ss")) .withColumn("data_last_generated_on", date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss a")) .select( col("userID").alias("user_id"), @@ -148,8 +148,9 @@ object UserEnrolmentModel extends AbsDashboardModel { col("courseID").alias("content_id"), col("completionPercentage").alias("content_progress_percentage"), col("completedOn").alias("completed_on"), + date_format(col("firstCompletedOn"), "yyyy-MM-dd HH:mm:ss").alias("first_completed_on"), col("Certificate_Generated").alias("certificate_generated"), - col("certificate_generated_on"), + col("certificate_generated_on").alias("certificate_generated_on"), col("userRating").alias("user_rating"), col("courseProgress").alias("resource_count_consumed"), col("enrolledOn").alias("enrolled_on"), @@ -164,3 +165,4 @@ object UserEnrolmentModel extends AbsDashboardModel { } } + diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/observation/question/ObservationQuestionReportJob.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/observation/question/ObservationQuestionReportJob.scala new file mode 100644 index 00000000..adfbbd92 --- /dev/null +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/observation/question/ObservationQuestionReportJob.scala @@ -0,0 +1,17 @@ +package org.ekstep.analytics.dashboard.report.observation.question + +import org.apache.spark.SparkContext +import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobDriver} +import org.ekstep.analytics.framework.util.JobLogger + +object ObservationQuestionReportJob extends optional.Application with IJob { + implicit val className = "org.ekstep.analytics.dashboard.report.observation.question.ObservationQuestionReportJob" + + override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = { + implicit val sparkContext: SparkContext = sc.getOrElse(null); + JobLogger.log("Started executing Job") + JobDriver.run("batch", config, ObservationQuestionReportModel) + JobLogger.log("Job Completed.") + } + +} diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/observation/question/ObservationQuestionReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/observation/question/ObservationQuestionReportModel.scala new file mode 100644 index 00000000..cf5ccfbc --- /dev/null +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/observation/question/ObservationQuestionReportModel.scala @@ -0,0 +1,138 @@ +package org.ekstep.analytics.dashboard.report.observation.question + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.spark.SparkContext +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.ekstep.analytics.dashboard.DashboardUtil._ +import org.ekstep.analytics.dashboard.DataUtil._ +import org.ekstep.analytics.dashboard.{AbsDashboardModel, DashboardConfig} +import org.ekstep.analytics.framework.FrameworkContext +import org.ekstep.analytics.framework.util.JobLogger +import org.joda.time.LocalDate +import org.joda.time.format.DateTimeFormat + +import java.text.SimpleDateFormat + +object ObservationQuestionReportModel extends AbsDashboardModel { + + implicit val className: String = "org.ekstep.analytics.dashboard.report.observation.question.ObservationQuestionReportModel" + + override def name() = "ObservationQuestionReportModel" + + def processData(timestamp: Long)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { + val today = getDate() + JobLogger.log("Querying mongo database to get report configurations") + val observationQuestionReportColumnsConfig = getReportConfig("observationQuestionReport") + val mapper = new ObjectMapper().registerModule(DefaultScalaModule) + val observationQuestionReportColumnsConfigMap = mapper.readValue(observationQuestionReportColumnsConfig, classOf[Map[String, String]]) + val reportColumnsMap = observationQuestionReportColumnsConfigMap("reportColumns").asInstanceOf[Map[String, String]] + val userProfileColumnsMap = observationQuestionReportColumnsConfigMap("userProfileColumns").asInstanceOf[Map[String, String]] + val sortingColumns = observationQuestionReportColumnsConfigMap("sortingColumns") + val columnsToBeQueried = reportColumnsMap.keys.mkString(",") + ",userProfile" + val userProfileSchema = StructType(userProfileColumnsMap.keys.toSeq.map(key => StructField(key, StringType, nullable = true))) + val reportColumns = reportColumnsMap.keys.toList.map(key => col(key).as(reportColumnsMap(key))) + val userProfileColumns = userProfileColumnsMap.keys.toList.map(key => col(s"parsedProfile.$key").as(userProfileColumnsMap(key))) + val requiredCsvColumns = reportColumns ++ userProfileColumns + val reportPath = s"${conf.mlReportPath}/${today}/ObservationQuestionsReport" + + /** + * Check to see if there is any solutionId are passed from config if Yes generate report only for those ID's + * If not generate report for all unique solutionId's from druid sl-observation datasource. + */ + val solutionIds = conf.solutionIDs + if (solutionIds != null && solutionIds.trim.nonEmpty) { + JobLogger.log("Processing report requests for specified solutionId's") + val solutionIdsDF = getSolutionIdsAsDF(solutionIds) + + solutionIdsDF.collect().foreach { row => + val solutionId = row.getString(0) + val solutionName = row.getString(1) + JobLogger.log(s"Started processing report request for solutionId: $solutionId") + generateObservationQuestionReport(solutionId, solutionName) + } + } else { + JobLogger.log("Processing report requests for all solutionId's") + JobLogger.log("Querying druid to get all the unique solutionId's") + val solutionIdsDF = loadAllUniqueSolutionIds("sl-observation") + + if (conf.includeExpiredSolutionIDs) { + JobLogger.log("Generating report for all the expired solutionId's also") + solutionIdsDF.collect().foreach { row => + val solutionId = row.getString(0) + val solutionName = row.getString(1) + JobLogger.log(s"Started processing report request for solutionId: $solutionId") + generateObservationQuestionReport(solutionId, solutionName) + } + } else { + JobLogger.log("Query mongodb to get solution end-date for all the unique solutionId's") + val solutionsEndDateDF = getSolutionsEndDate(solutionIdsDF) + solutionsEndDateDF.collect().foreach { row => + val solutionId = row.getString(0) + val solutionName = row.getString(1) + val endDate = new SimpleDateFormat("yyyy-MM-dd").format(row.getDate(1)) + if (endDate != null) { + JobLogger.log(s"Started processing report request for solutionId: $solutionId") + if (isSolutionWithinReportDate(endDate)) { + JobLogger.log(s"Solution with Id $solutionId will ends on $endDate") + generateObservationQuestionReport(solutionId, solutionName) + } else { + JobLogger.log(s"Solution with Id $solutionId has ended on $endDate date, Hence not generating the report for this ID ") + } + } else { + JobLogger.log(s"End Date for solutionId: $solutionId is NULL, Hence skipping generating the report for this ID ") + } + } + } + + /** + * This method takes the endDate and checks if that date is within the Report Date + * + * @param endDate + * @return + */ + def isSolutionWithinReportDate(endDate: String): Boolean = { + val formatter = DateTimeFormat.forPattern("yyyy-MM-dd") + val today = LocalDate.now() + val updatedDate = today.minusDays(conf.gracePeriod.toInt) + val endDateOfSolution = formatter.parseLocalDate(endDate) + endDateOfSolution.isEqual(today) || (endDateOfSolution.isAfter(today) || endDateOfSolution.isAfter(updatedDate)) || endDateOfSolution.isEqual(updatedDate) + } + } + JobLogger.log("Zipping the csv content folder and syncing to blob storage") + zipAndSyncReports(s"${conf.localReportDir}/${reportPath}", reportPath) + JobLogger.log("Successfully zipped folder and synced to blob storage") + + /** + * This method takes solutionId to query, parse userProfile JSON and sort the CSV + * + * @param solutionId + */ + def generateObservationQuestionReport(solutionId: String, solutionName: String) = { + val dataSource = "sl-observation" + val originalSolutionDf = getSolutionIdData(columnsToBeQueried, dataSource, solutionId) + JobLogger.log(s"Successfully executed druid query for solutionId: $solutionId") + val finalSolutionDf = processProfileData(originalSolutionDf, userProfileSchema, requiredCsvColumns) + JobLogger.log(s"Successfully parsed userProfile key for solutionId: $solutionId") + val columnsMatch = validateColumns(finalSolutionDf, sortingColumns.split(",").map(_.trim)) + + if (columnsMatch == true) { + val columnsOrder = sortingColumns.split(",").map(_.trim) + val sortedFinalDF = finalSolutionDf.select(columnsOrder.map(col): _*) + val solutionsName = solutionName + .replaceAll("[^a-zA-Z0-9\\s]", "") + .replaceAll("\\s+", " ") + .trim() + generateReport(sortedFinalDF, s"${reportPath}", fileName = s"${solutionsName}-${solutionId}", fileSaveMode = SaveMode.Append) + + JobLogger.log(s"Successfully generated observation question csv report for solutionId: $solutionId") + } else { + JobLogger.log(s"Error occurred while matching the data frame columns with config sort columns for solutionId: $solutionId") + } + } + + } + +} diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/observation/status/ObservationStatusReportJob.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/observation/status/ObservationStatusReportJob.scala new file mode 100644 index 00000000..33e5f307 --- /dev/null +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/observation/status/ObservationStatusReportJob.scala @@ -0,0 +1,17 @@ +package org.ekstep.analytics.dashboard.report.observation.status + +import org.apache.spark.SparkContext +import org.ekstep.analytics.framework.util.JobLogger +import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobDriver} + +object ObservationStatusReportJob extends optional.Application with IJob { + implicit val className = "org.ekstep.analytics.dashboard.report.observation.status.ObservationStatusReportJob" + + override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = { + implicit val sparkContext: SparkContext = sc.getOrElse(null); + JobLogger.log("Started executing Job") + JobDriver.run("batch", config, ObservationStatusReportModel) + JobLogger.log("Job Completed.") + } + +} \ No newline at end of file diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/observation/status/ObservationStatusReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/observation/status/ObservationStatusReportModel.scala new file mode 100644 index 00000000..9d50ecda --- /dev/null +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/observation/status/ObservationStatusReportModel.scala @@ -0,0 +1,170 @@ +package org.ekstep.analytics.dashboard.report.observation.status + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.functions.{coalesce, col} +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.ekstep.analytics.dashboard.DashboardUtil._ +import org.ekstep.analytics.dashboard.DataUtil._ +import org.ekstep.analytics.dashboard.{AbsDashboardModel, DashboardConfig} +import org.ekstep.analytics.framework.FrameworkContext +import org.ekstep.analytics.framework.util.JobLogger +import org.joda.time.LocalDate +import org.joda.time.format.DateTimeFormat + +import java.text.SimpleDateFormat + +object ObservationStatusReportModel extends AbsDashboardModel { + + implicit val className: String = "org.ekstep.analytics.dashboard.report.observation.status.ObservationStatusReportModel" + + override def name() = "ObservationStatusReportModel" + + def processData(timestamp: Long)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { + + val today = getDate() + JobLogger.log("Querying mongo database to get report configurations") + val observationStatusReportColumnsConfig = getReportConfig("observationStatusReport") + val mapper = new ObjectMapper().registerModule(DefaultScalaModule) + val observationStatusReportColumnsConfigMap = mapper.readValue(observationStatusReportColumnsConfig, classOf[Map[String, String]]) + val reportColumnsMap = observationStatusReportColumnsConfigMap("reportColumns").asInstanceOf[Map[String, String]] + val userProfileColumnsMap = observationStatusReportColumnsConfigMap("userProfileColumns").asInstanceOf[Map[String, String]] + val sortingColumns = observationStatusReportColumnsConfigMap("sortingColumns") + val columnsToBeQueried = reportColumnsMap.keys.mkString(",") + ",userProfile" + val userProfileSchema = StructType(userProfileColumnsMap.keys.toSeq.map(key => StructField(key, StringType, nullable = true))) + val reportColumns = reportColumnsMap.keys.toList.map(key => col(key).as(reportColumnsMap(key))) + val userProfileColumns = userProfileColumnsMap.keys.toList.map(key => col(s"parsedProfile.$key").as(userProfileColumnsMap(key))) + val requiredCsvColumns = reportColumns ++ userProfileColumns + val reportPath = s"${conf.mlReportPath}/${today}/ObservationCompletedSubmissionsReport" + + /** + * Check to see if there is any solutionId are passed from config if Yes generate report only for those ID's + * If not generate report for all unique solutionId's from druid sl-observation-meta datasource. + */ + val solutionIds = conf.solutionIDs + if (solutionIds != null && solutionIds.trim.nonEmpty) { + JobLogger.log("Processing report requests from the configurations") + val solutionIdsDF = getSolutionIdsAsDF(solutionIds) + + solutionIdsDF.collect().foreach { row => + val solutionId = row.getString(0) + val solutionName = row.getString(1) + JobLogger.log(s"Started processing report request for solutionId: $solutionId") + generateObservationStatusReport(solutionId, solutionName) + } + } else { + JobLogger.log("Processing report requests for all solutionId's") + JobLogger.log("Querying druid to get all the unique solutionId's") + val solutionIdsDF = loadAllUniqueSolutionIds("sl-observation-meta") + + if (conf.includeExpiredSolutionIDs) { + JobLogger.log("Generating report for all the expired solutionId's also") + solutionIdsDF.collect().foreach { row => + val solutionId = row.getString(0) + val solutionName = row.getString(1) + JobLogger.log(s"Started processing report request for solutionId: $solutionId") + generateObservationStatusReport(solutionId, solutionName) + } + } else { + JobLogger.log("Query mongodb to get solution end-date for all the unique solutionId's") + val solutionsEndDateDF = getSolutionsEndDate(solutionIdsDF) + solutionsEndDateDF.collect().foreach { row => + val solutionId = row.getString(0) + val solutionName = row.getString(1) + val endDate = new SimpleDateFormat("yyyy-MM-dd").format(row.getDate(1)) + if (endDate != null) { + JobLogger.log(s"Started processing report request for solutionId: $solutionId") + if (isSolutionWithinReportDate(endDate)) { + JobLogger.log(s"Solution with Id $solutionId will ends on $endDate") + generateObservationStatusReport(solutionId, solutionName) + } else { + JobLogger.log(s"Solution with Id $solutionId has ended on $endDate date, Hence not generating the report for this ID ") + } + } else { + JobLogger.log(s"End Date for solutionId: $solutionId is NULL, Hence skipping generating the report for this ID ") + } + } + } + + /** + * This method takes the endDate and checks if that date is within the Report Date + * + * @param endDate + * @return + */ + def isSolutionWithinReportDate(endDate: String): Boolean = { + val formatter = DateTimeFormat.forPattern("yyyy-MM-dd") + val today = LocalDate.now() + val updatedDate = today.minusDays(conf.gracePeriod.toInt) + val endDateOfSolution = formatter.parseLocalDate(endDate) + endDateOfSolution.isEqual(today) || (endDateOfSolution.isAfter(today) || endDateOfSolution.isAfter(updatedDate)) || endDateOfSolution.isEqual(updatedDate) + } + } + JobLogger.log("Zipping the csv content folder and syncing to blob storage") + zipAndSyncReports(s"${conf.localReportDir}/${reportPath}", reportPath) + JobLogger.log("Successfully zipped folder and synced to blob storage") + + /** + * This method takes solutionId to query, parse userProfile JSON, append status data and sort the CSV + * + * @param solutionId + */ + def generateObservationStatusReport(solutionId: String, solutionName: String) = { + val dataSource = "sl-observation-meta" + val originalSolutionDf = getSolutionIdData(columnsToBeQueried, dataSource, solutionId) + JobLogger.log(s"Successfully executed druid query for solutionId: $solutionId") + val solutionWithUserProfileDF = processProfileData(originalSolutionDf, userProfileSchema, requiredCsvColumns) + JobLogger.log(s"Successfully parsed userProfile key for solutionId: $solutionId") + val finalSolutionDf = appendObservationStatusData(solutionWithUserProfileDF) + JobLogger.log(s"Successfully added observation status details for solutionId: $solutionId") + val columnsMatch = validateColumns(finalSolutionDf, sortingColumns.split(",").map(_.trim)) + + if (columnsMatch == true) { + val columnsOrder = sortingColumns.split(",").map(_.trim) + val sortedFinalDF = finalSolutionDf.select(columnsOrder.map(col): _*) + val solutionsName = solutionName + .replaceAll("[^a-zA-Z0-9\\s]", "") + .replaceAll("\\s+", " ") + .trim() + generateReport(sortedFinalDF, s"${reportPath}", fileName = s"${solutionsName}-${solutionId}", fileSaveMode = SaveMode.Append) + JobLogger.log(s"Successfully generated observation status csv report for solutionId: $solutionId") + } else { + JobLogger.log(s"Error occurred while matching the dataframe columns with config sort columns for solutionId: $solutionId") + } + } + + def appendObservationStatusData(solutionDf: DataFrame): DataFrame = { + JobLogger.log("Processing for observation completed status") + val completedStatus = getObservationStatusCompletedData(solutionDf) + val filteredCompletedStatus = completedStatus.filter(col("Status of Submission").isNull || col("Status of Submission") === "") + .select("Observation Submission Id", "Status of Submission") + + if (!filteredCompletedStatus.isEmpty) { + JobLogger.log("Processing for observation inprogress status") + val inProgressStatus = getObservationStatusInProgressData(filteredCompletedStatus) + val joinedCompletedAndInProgressDF = completedStatus + .join(inProgressStatus, Seq("Observation Submission Id"), "left") + .select( + completedStatus("*"), + coalesce(inProgressStatus("Status of Submission"), completedStatus("Status of Submission")).as("Updated Status of Submission") + ).drop("Status of Submission") + .withColumnRenamed("Updated Status of Submission", "Status of Submission") + val filteredInProgressStatus = joinedCompletedAndInProgressDF.filter(col("Status of Submission").isNull || col("Status of Submission") === "") + .select("Observation Submission Id", "Status of Submission") + + if (!filteredInProgressStatus.isEmpty) { + JobLogger.log("Processing for observation started status") + val joinedStartedAndInProgressDF = joinedCompletedAndInProgressDF.na.fill("Started", Seq("Status of Submission")) + return joinedStartedAndInProgressDF + } else { + return joinedCompletedAndInProgressDF + } + } + completedStatus + } + + } + +} diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/survey/question/QuestionReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/survey/question/QuestionReportModel.scala index 09547caf..4a871492 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/survey/question/QuestionReportModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/survey/question/QuestionReportModel.scala @@ -24,7 +24,8 @@ object QuestionReportModel extends AbsDashboardModel { def processData(timestamp: Long)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { val today = getDate() - val surveyQuestionReportColumnsConfig = conf.surveyQuestionReportColumnsConfig + JobLogger.log("Querying mongo database to get report configurations") + val surveyQuestionReportColumnsConfig = getReportConfig("surveyQuestionReport") val mapper = new ObjectMapper().registerModule(DefaultScalaModule) val surveyQuestionReportColumnsConfigMap = mapper.readValue(surveyQuestionReportColumnsConfig, classOf[Map[String, String]]) val reportColumnsMap = surveyQuestionReportColumnsConfigMap("reportColumns").asInstanceOf[Map[String, String]] @@ -118,7 +119,12 @@ object QuestionReportModel extends AbsDashboardModel { if (columnsMatch == true) { val columnsOrder = sortingColumns.split(",").map(_.trim) val sortedFinalDF = finalSolutionDf.select(columnsOrder.map(col): _*) - generateReport(sortedFinalDF, s"${reportPath}", fileName = s"${solutionName}-${solutionId}", fileSaveMode = SaveMode.Append) + val solutionsName = solutionName + .replaceAll("[^a-zA-Z0-9\\s]", "") + .replaceAll("\\s+", " ") + .trim() + generateReport(sortedFinalDF, s"${reportPath}", fileName = s"${solutionsName}-${solutionId}", fileSaveMode = SaveMode.Append) + JobLogger.log(s"Successfully generated survey question csv report for solutionId: $solutionId") } else { JobLogger.log(s"Error occurred while matching the data frame columns with config sort columns for solutionId: $solutionId") diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/survey/status/StatusReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/survey/status/StatusReportModel.scala index e8ac5297..07c6bc99 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/survey/status/StatusReportModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/survey/status/StatusReportModel.scala @@ -5,7 +5,8 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.spark.SparkContext import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.{SaveMode, SparkSession} + import org.ekstep.analytics.dashboard.DashboardUtil._ import org.ekstep.analytics.dashboard.DataUtil._ import org.ekstep.analytics.dashboard.{AbsDashboardModel, DashboardConfig} @@ -25,7 +26,8 @@ object StatusReportModel extends AbsDashboardModel { def processData(timestamp: Long)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { val today = getDate() - val surveyStatusReportColumnsConfig = conf.surveyStatusReportColumnsConfig + JobLogger.log("Querying mongo database to get report configurations") + val surveyStatusReportColumnsConfig = getReportConfig("surveyStatusReport") val mapper = new ObjectMapper().registerModule(DefaultScalaModule) val surveyStatusReportColumnsConfigMap = mapper.readValue(surveyStatusReportColumnsConfig, classOf[Map[String, String]]) val reportColumnsMap = surveyStatusReportColumnsConfigMap("reportColumns").asInstanceOf[Map[String, String]] @@ -119,8 +121,12 @@ object StatusReportModel extends AbsDashboardModel { if (columnsMatch == true) { val columnsOrder = sortingColumns.split(",").map(_.trim) val sortedFinalDF = finalSolutionDf.select(columnsOrder.map(col): _*) - generateReport(sortedFinalDF, s"${reportPath}", fileName = s"${solutionName}-${solutionId}", fileSaveMode = SaveMode.Append) - JobLogger.log(s"Successfully generated survey question csv report for solutionId: $solutionId") + val solutionsName = solutionName + .replaceAll("[^a-zA-Z0-9\\s]", "") + .replaceAll("\\s+", " ") + .trim() + generateReport(sortedFinalDF, s"${reportPath}", fileName = s"${solutionsName}-${solutionId}", fileSaveMode = SaveMode.Append) + JobLogger.log(s"Successfully generated survey status csv report for solutionId: $solutionId") } else { JobLogger.log(s"Error occurred while matching the dataframe columns with config sort columns for solutionId: $solutionId") } diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/user/UserReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/user/UserReportModel.scala index 73be6c23..0d516342 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/user/UserReportModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/user/UserReportModel.scala @@ -29,12 +29,14 @@ object UserReportModel extends AbsDashboardModel { // var df = mdoIDDF.join(orgDF, Seq("orgID"), "inner").select(col("orgID").alias("userOrgID"), col("orgName")) + var karmaPointsDF = userKarmaPointsSummaryDataFrame() + karmaPointsDF = karmaPointsDF.withColumnRenamed("userid", "userID") val userData = userOrgDF .join(userRolesDF, Seq("userID"), "left") + .join(karmaPointsDF.select("userID","total_points"), Seq("userID"), "left") .join(orgHierarchyData, Seq("userOrgID"), "left") .dropDuplicates("userID") .withColumn("Tag", concat_ws(", ", col("additionalProperties.tag"))) - .where(expr("userStatus=1")) val fullReportDF = userData .withColumn("Report_Last_Generated_On", date_format(current_timestamp(), "dd/MM/yyyy HH:mm:ss a")) @@ -78,6 +80,8 @@ object UserReportModel extends AbsDashboardModel { .select( col("userID").alias("user_id"), col("userOrgID").alias("mdo_id"), + col("userStatus").alias("status"), + col("total_points").alias("total_kp"), col("fullName").alias("full_name"), col("professionalDetails.designation").alias("designation"), col("personalDetails.primaryEmail").alias("email"), @@ -85,7 +89,8 @@ object UserReportModel extends AbsDashboardModel { col("professionalDetails.group").alias("groups"), col("Tag").alias("tag"), col("userVerified").alias("is_verified_karmayogi"), - from_unixtime(col("userCreatedTimestamp"), "dd/MM/yyyy").alias("user_registration_date"), + //from_unixtime(col("userCreatedTimestamp"), "dd/MM/yyyy").alias("user_registration_date"), + date_format(from_unixtime(col("userCreatedTimestamp")), "yyyy-MM-dd HH:mm:ss").alias("user_registration_date"), col("role").alias("roles"), col("personalDetails.gender").alias("gender"), col("personalDetails.category").alias("category"), @@ -101,3 +106,5 @@ object UserReportModel extends AbsDashboardModel { } } + + diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/warehouse/DataWarehouseModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/warehouse/DataWarehouseModel.scala index d30d47fd..97843928 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/warehouse/DataWarehouseModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/warehouse/DataWarehouseModel.scala @@ -25,12 +25,11 @@ object DataWarehouseModel extends AbsDashboardModel { var user_details = spark.read.option("header", "true") .csv(s"${conf.localReportDir}/${conf.userReportPath}/${today}-warehouse") - user_details = user_details.withColumn("user_registration_date", to_date(col("user_registration_date"), "dd/MM/yyyy")) - + user_details = user_details.withColumn("status", col("status").cast("int")) + .withColumn("total_kp", col("total_kp").cast("int")) truncateWarehouseTable(conf.dwUserTable) saveDataframeToPostgresTable_With_Append(user_details, dwPostgresUrl, conf.dwUserTable, conf.dwPostgresUsername, conf.dwPostgresCredential) - var content_details = spark.read.option("header", "true") .csv(s"${conf.localReportDir}/${conf.courseReportPath}/${today}-warehouse") @@ -38,14 +37,9 @@ object DataWarehouseModel extends AbsDashboardModel { .withColumn("resource_count", col("resource_count").cast("int")) .withColumn("total_certificates_issued", col("total_certificates_issued").cast("int")) .withColumn("content_rating", col("content_rating").cast("float")) - .withColumn("batch_start_date",to_date(col("batch_start_date"), "yyyy-MM-dd")) - .withColumn("batch_end_date", to_date(col("batch_end_date"), "yyyy-MM-dd")) - .withColumn("last_published_on", to_date(col("last_published_on"), "yyyy-MM-dd")) - .withColumn("content_retired_on", to_date(col("content_retired_on"), "yyyy-MM-dd")) content_details = content_details.dropDuplicates(Seq("content_id")) - truncateWarehouseTable(conf.dwCourseTable) saveDataframeToPostgresTable_With_Append(content_details, dwPostgresUrl, conf.dwCourseTable, conf.dwPostgresUsername, conf.dwPostgresCredential) @@ -56,9 +50,6 @@ object DataWarehouseModel extends AbsDashboardModel { .withColumn("content_progress_percentage", col("content_progress_percentage").cast("float")) .withColumn("user_rating", col("user_rating").cast("float")) .withColumn("resource_count_consumed", col("resource_count_consumed").cast("int")) - .withColumn("completed_on", to_date(col("completed_on"), "yyyy-MM-dd")) - .withColumn("certificate_generated_on", to_date(col("certificate_generated_on"), "yyyy-MM-dd")) - .withColumn("enrolled_on", to_date(col("enrolled_on"), "yyyy-MM-dd")) .withColumn("live_cbp_plan_mandate", col("live_cbp_plan_mandate").cast("boolean")) .filter(col("content_id").isNotNull) @@ -75,28 +66,27 @@ object DataWarehouseModel extends AbsDashboardModel { .withColumn("total_question", col("total_question").cast("int")) .withColumn("number_of_incorrect_responses", col("number_of_incorrect_responses").cast("int")) .withColumn("number_of_retakes", col("number_of_retakes").cast("int")) - .withColumn("completion_date", to_date(col("completion_date"), "dd/MM/yyyy")) .filter(col("content_id").isNotNull) truncateWarehouseTable(conf.dwAssessmentTable) saveDataframeToPostgresTable_With_Append(assessment_details, dwPostgresUrl, conf.dwAssessmentTable, conf.dwPostgresUsername, conf.dwPostgresCredential) - var bp_enrollments = spark.read.option("header", "true") + var bp_enrollments = spark.read.option("header", "true") .csv(s"${conf.localReportDir}/${conf.blendedReportPath}/${today}-warehouse") - bp_enrollments = bp_enrollments + bp_enrollments = bp_enrollments .withColumn("component_progress_percentage", col("component_progress_percentage").cast("float")) - .withColumn("offline_session_date", to_date(col("offline_session_date"), "yyyy-MM-dd")) - .withColumn("component_completed_on", to_date(col("component_completed_on"), "yyyy-MM-dd")) - .withColumn("last_accessed_on", to_date(col("last_accessed_on"), "yyyy-MM-dd")) - .withColumnRenamed("instructor(s)_name", "instructors_name") - .filter(col("content_id").isNotNull) - .filter(col("user_id").isNotNull) - .filter(col("batch_id").isNotNull) + .withColumn("offline_session_date", to_date(col("offline_session_date"), "yyyy-MM-dd")) + .withColumn("component_completed_on", to_date(col("component_completed_on"), "yyyy-mm-dd")) + .withColumn("last_accessed_on", to_date(col("last_accessed_on"), "yyyy-MM-dd")) + .withColumnRenamed("instructor(s)_name", "instructors_name") + .filter(col("content_id").isNotNull) + .filter(col("user_id").isNotNull) + .filter(col("batch_id").isNotNull) - truncateWarehouseTable(conf.dwBPEnrollmentsTable) - saveDataframeToPostgresTable_With_Append(bp_enrollments, dwPostgresUrl, conf.dwBPEnrollmentsTable, conf.dwPostgresUsername, conf.dwPostgresCredential) + truncateWarehouseTable(conf.dwBPEnrollmentsTable) + saveDataframeToPostgresTable_With_Append(bp_enrollments, dwPostgresUrl, conf.dwBPEnrollmentsTable, conf.dwPostgresUsername, conf.dwPostgresCredential) val cb_plan = spark.read.option("header", "true") .csv(s"${conf.localReportDir}/${conf.acbpReportPath}/${today}-warehouse") @@ -112,3 +102,4 @@ object DataWarehouseModel extends AbsDashboardModel { } } + diff --git a/batch-models/src/main/scala/org/ekstep/analytics/job/JobFactory.scala b/batch-models/src/main/scala/org/ekstep/analytics/job/JobFactory.scala index d9bdb6aa..ac9c21e6 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/job/JobFactory.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/job/JobFactory.scala @@ -14,6 +14,8 @@ import org.ekstep.analytics.dashboard.report.user.UserReportJob import org.ekstep.analytics.dashboard.survey.nps.NpsJob import org.ekstep.analytics.dashboard.report.course.CourseReportJob import org.ekstep.analytics.dashboard.report.enrolment.UserEnrolmentJob +import org.ekstep.analytics.dashboard.report.observation.question.ObservationQuestionReportJob +import org.ekstep.analytics.dashboard.report.observation.status.ObservationStatusReportJob import org.ekstep.analytics.dashboard.report.survey.question.QuestionReportJob import org.ekstep.analytics.dashboard.report.survey.status.StatusReportJob import org.ekstep.analytics.dashboard.report.warehouse.DataWarehouseJob @@ -104,6 +106,10 @@ object JobFactory { QuestionReportJob case "survey-status-report" => StatusReportJob + case "observation-question-report" => + ObservationQuestionReportJob + case "observation-status-report" => + ObservationStatusReportJob case _ => reflectModule(jobType); }