From 73748ab56f02a2bfbdb64e05833cb82f24e1e937 Mon Sep 17 00:00:00 2001 From: Varsha Mahuli Date: Wed, 7 Aug 2024 18:06:35 +0530 Subject: [PATCH] rolled up operational report modifications --- .../ekstep/analytics/dashboard/DataUtil.scala | 62 +++++++++++++++++++ .../report/acbp/UserACBPReportModel.scala | 12 +++- .../report/assess/UserAssessmentModel.scala | 7 ++- .../blended/BlendedProgramReportModel.scala | 6 +- .../cba/CourseBasedAssessmentModel.scala | 7 ++- .../report/course/CourseReportModel.scala | 8 ++- .../report/enrolment/UserEnrolmentModel.scala | 6 +- .../report/user/UserReportModel.scala | 10 ++- 8 files changed, 104 insertions(+), 14 deletions(-) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala index db5b32fd..56efe873 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala @@ -1883,5 +1883,67 @@ object DataUtil extends Serializable { show(df, "ratings") df } + def processOrgsL3(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = { + val organisationDF = df.dropDuplicates() + val sumDF = organisationDF.withColumn("allIDs", lit(null).cast("string")).select(col("organisationID").alias("ministryID"), col("allIDs")) + sumDF + + } + + def processDepartmentL2(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = { + val organisationDF = df + .join(orgHierarchyCompleteDF, df("departmentMapID") === orgHierarchyCompleteDF("l2mapid"), "left") + .select(df("departmentID"), col("sborgid").alias("organisationID")).dropDuplicates() + val sumDF = organisationDF + .groupBy("departmentID") + .agg( + concat_ws(",", collect_set(when(col("organisationID").isNotNull, col("organisationID")))).alias("orgIDs") + ) + .withColumn("associatedIds", concat_ws(",", col("orgIDs"))) + .withColumn("allIDs", concat_ws(",", col("departmentID"), col("associatedIds"))) + .select(col("departmentID").alias("ministryID"), col("allIDs")) + sumDF + + } + + def processMinistryL1(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = { + println("Processing Ministry L1 DataFrame:") + val departmentAndMapIDsDF = df + .join(orgHierarchyCompleteDF, df("ministryMapID") === orgHierarchyCompleteDF("l1mapid"), "left") + .select(df("ministryID"), col("sborgid").alias("departmentID"), col("mapid").alias("departmentMapID")) + + // Join with orgHierarchyCompleteDF to get the organisationDF + val organisationDF = departmentAndMapIDsDF + .join(orgHierarchyCompleteDF, departmentAndMapIDsDF("departmentMapID") === orgHierarchyCompleteDF("l2mapid"), "left") + .select(departmentAndMapIDsDF("ministryID"), departmentAndMapIDsDF("departmentID"),col("sborgid").alias("organisationID")).dropDuplicates() + val sumDF = organisationDF + .groupBy("ministryID") + .agg( + concat_ws(",", collect_set(when(col("departmentID").isNotNull, col("departmentID")))).alias("departmentIDs"), + concat_ws(",", collect_set(when(col("organisationID").isNotNull, col("organisationID")))).alias("orgIDs") + ) + .withColumn("associatedIds", concat_ws(",", col("departmentIDs"), col("orgIDs"))) + .withColumn("allIDs", concat_ws(",", col("ministryID"), col("associatedIds"))) + .select(col("ministryID"), col("allIDs")) + sumDF + } + + def getDetailedHierarchy(userOrgDF: DataFrame)(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { + var orgHierarchyCompleteDF = orgCompleteHierarchyDataFrame() + var distinctMdoIDsDF = userOrgDF.select("userOrgID").distinct() + val joinedDF = orgHierarchyCompleteDF.join(distinctMdoIDsDF, orgHierarchyCompleteDF("sborgid") === distinctMdoIDsDF("userOrgID"), "inner") + println("The number of distinct orgs in orgHierarchy is: "+joinedDF.count()) + val ministryL1DF = joinedDF.filter(col("l1mapid").isNull && col("l2mapid").isNull && col("l3mapid").isNull).select(col("sborgid").alias("ministryID"), col("mapid").alias("ministryMapID")) + val ministryOrgDF = processMinistryL1(ministryL1DF, userOrgDF, orgHierarchyCompleteDF) + val departmentL2DF = joinedDF.filter(col("l2mapid").isNull && col("l1mapid").isNotNull || col("l3mapid").isNotNull).select(col("sborgid").alias("departmentID"), col("mapid").alias("departmentMapID")) + val deptOrgDF = processDepartmentL2(departmentL2DF, userOrgDF, orgHierarchyCompleteDF) + val orgsL3DF = joinedDF.filter((col("l3mapid").isNull) && col("l2mapid").isNotNull && col("l1mapid").isNotNull).select(col("sborgid").alias("organisationID")) + val orgsDF = processOrgsL3(orgsL3DF, userOrgDF, orgHierarchyCompleteDF) + val combinedMinistryMetricsDF = ministryOrgDF.union(deptOrgDF).union(orgsDF) + val updatedDF = combinedMinistryMetricsDF.withColumn("allIDs", when(col("allIDs").isNull || trim(col("allIDs")) === "", concat(lit("$"), col("ministryID"))).otherwise(col("allIDs"))) + show(updatedDF, "This will be the final hierarchy") + updatedDF + } + } diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/acbp/UserACBPReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/acbp/UserACBPReportModel.scala index 8608ae10..b84030a1 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/acbp/UserACBPReportModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/acbp/UserACBPReportModel.scala @@ -20,7 +20,7 @@ object UserACBPReportModel extends AbsDashboardModel { // get user and org data frames var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames() val orgHierarchyData = orgHierarchyDataframe() - + val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF) val userDataDF = userOrgDF .join(orgHierarchyData, Seq("userOrgID"), "left") .withColumn("designation", coalesce(col("professionalDetails.designation"), lit(""))) @@ -113,7 +113,10 @@ object UserACBPReportModel extends AbsDashboardModel { val reportPath = s"${conf.acbpReportPath}/${today}" generateReport(enrolmentReportDF.drop("mdoid"), s"${reportPath}/CBPEnrollmentReport", fileName="CBPEnrollmentReport") - generateReport(enrolmentReportDF, s"${conf.acbpMdoEnrolmentReportPath}/${today}","mdoid", "CBPEnrollmentReport") + val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid") + val combinedReportDF = enrolmentReportDF.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid"))) + val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1) + generateReport(filteredCombinedReportDF, s"${conf.acbpMdoEnrolmentReportPath}/${today}","mdoid", "CBPEnrollmentReport") // to be removed once new security job is created if (conf.reportSyncEnable) { syncReports(s"${conf.localReportDir}/${reportPath}", s"${conf.acbpMdoEnrolmentReportPath}/${today}") @@ -150,7 +153,10 @@ object UserACBPReportModel extends AbsDashboardModel { ) show(userSummaryReportDF, "userSummaryReportDF") generateReport(userSummaryReportDF.drop("mdoid"), s"${reportPath}/CBPUserSummaryReport", fileName="CBPUserSummaryReport") - generateReport(userSummaryReportDF.coalesce(1), s"${conf.acbpMdoSummaryReportPath}/${today}","mdoid", "CBPUserSummaryReport") + val explodedDF1 = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid") + val combinedReportDF1 = userSummaryReportDF.join(explodedDF1, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid"))) + val filteredCombinedReportDF1 = combinedReportDF1.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1) + generateReport(filteredCombinedReportDF1.coalesce(1), s"${conf.acbpMdoSummaryReportPath}/${today}","mdoid", "CBPUserSummaryReport") // to be removed once new security job is created if(conf.reportSyncEnable) { syncReports(s"${conf.localReportDir}/${reportPath}", s"${conf.acbpMdoSummaryReportPath}/${today}") diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/assess/UserAssessmentModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/assess/UserAssessmentModel.scala index 001e82e6..c73a651f 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/assess/UserAssessmentModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/assess/UserAssessmentModel.scala @@ -25,7 +25,7 @@ object UserAssessmentModel extends AbsDashboardModel { // obtain user org data var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames() - + val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF) // get course details, with rating info val (hierarchyDF, allCourseProgramDetailsWithCompDF, allCourseProgramDetailsDF, allCourseProgramDetailsWithRatingDF) = contentDataFrames(orgDF) @@ -94,7 +94,10 @@ object UserAssessmentModel extends AbsDashboardModel { val reportPath = s"${conf.standaloneAssessmentReportPath}/${today}" // generateReport(df, s"${reportPath}-full") df = df.drop("assessID", "assessOrgID") - generateAndSyncReports(df, "mdoid", reportPath, "StandaloneAssessmentReport") + val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid") + val combinedReportDF = df.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid"))) + val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1) + generateAndSyncReports(filteredCombinedReportDF, "mdoid", reportPath, "StandaloneAssessmentReport") Redis.closeRedisConnect() diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/blended/BlendedProgramReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/blended/BlendedProgramReportModel.scala index 08427af2..9502dca7 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/blended/BlendedProgramReportModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/blended/BlendedProgramReportModel.scala @@ -26,6 +26,7 @@ object BlendedProgramReportModel extends AbsDashboardModel { // get user and user org data var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames() val orgHierarchyData = orgHierarchyDataframe() + val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF) var userDataDF = userOrgDF .withColumn("userPrimaryEmail", col("personalDetails.primaryEmail")) @@ -276,7 +277,10 @@ object BlendedProgramReportModel extends AbsDashboardModel { .withColumnRenamed("maskedPhone", "Phone_Number") show(cbpReportDF, "cbpReportDF") - generateAndSyncReports(cbpReportDF, "mdoid", reportPathCBP, "BlendedProgramReport") + val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid") + val combinedReportDF = cbpReportDF.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid"))) + val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1) + generateAndSyncReports(filteredCombinedReportDF, "mdoid", reportPathCBP, "BlendedProgramReport") val df_warehouse = fullDF diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/cba/CourseBasedAssessmentModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/cba/CourseBasedAssessmentModel.scala index 7a57d005..f37c3d4b 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/cba/CourseBasedAssessmentModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/cba/CourseBasedAssessmentModel.scala @@ -22,7 +22,7 @@ object CourseBasedAssessmentModel extends AbsDashboardModel { val today = getDate() var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames() - + val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF) // get course details, with rating info val (hierarchyDF, allCourseProgramDetailsWithCompDF, allCourseProgramDetailsDF, allCourseProgramDetailsWithRatingDF) = contentDataFrames(orgDF, Seq("Course", "Program", "Blended Program", "Standalone Assessment", "Curated Program"), runValidation = false) @@ -113,7 +113,10 @@ object CourseBasedAssessmentModel extends AbsDashboardModel { // generateReport(fullReportDF, s"${reportPath}-full") val mdoReportDF = fullReportDF.drop("assessID", "assessOrgID", "assessChildID", "userOrgID") - generateReport(mdoReportDF, reportPath,"mdoid", "UserAssessmentReport") + val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid") + val combinedReportDF = mdoReportDF.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid"))) + val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1) + generateReport(filteredCombinedReportDF, reportPath,"mdoid", "UserAssessmentReport") // to be removed once new security job is created if (conf.reportSyncEnable) { syncReports(s"${conf.localReportDir}/${reportPath}", reportPath) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/course/CourseReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/course/CourseReportModel.scala index a3e97712..a5756fd9 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/course/CourseReportModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/course/CourseReportModel.scala @@ -19,7 +19,8 @@ object CourseReportModel extends AbsDashboardModel { val today = getDate() val orgDF = orgDataFrame() - + var (organisationDF, userDF, userOrgDF) = getOrgUserDataFrames() + val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF) // Get course data first val allCourseProgramDetailsDF = contentWithOrgDetailsDataFrame(orgDF, Seq("Course", "Program", "Blended Program", "CuratedCollections", "Curated Program")) // Get content resource hierarchy @@ -183,7 +184,10 @@ object CourseReportModel extends AbsDashboardModel { // generateReport(fullReportDF, s"${reportPath}-full") val mdoReportDF = fullReportDF.drop("courseID", "courseOrgID") - generateReport(mdoReportDF, reportPath,"mdoid", "ContentReport") + val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid") + val combinedReportDF = mdoReportDF.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid"))) + val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1) + generateReport(filteredCombinedReportDF, reportPath,"mdoid", "ContentReport") // to be removed once new security job is created if (conf.reportSyncEnable) { syncReports(s"${conf.localReportDir}/${reportPath}", reportPath) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/enrolment/UserEnrolmentModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/enrolment/UserEnrolmentModel.scala index 1df0e1a8..38bd12cc 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/enrolment/UserEnrolmentModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/enrolment/UserEnrolmentModel.scala @@ -25,6 +25,7 @@ object UserEnrolmentModel extends AbsDashboardModel { //GET ORG DATA var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames() + val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF) val orgHierarchyData = orgHierarchyDataframe() val userDataDF = userOrgDF .join(orgHierarchyData, Seq("userOrgID"), "left") @@ -142,7 +143,10 @@ object UserEnrolmentModel extends AbsDashboardModel { // generateReport(fullReportDF, s"${reportPath}-full") val mdoReportDF = fullReportDF.drop("userID", "userOrgID", "courseID", "courseOrgID", "issuedCertificateCount", "courseStatus", "resourceCount", "resourcesConsumed", "rawCompletionPercentage") - generateReport(mdoReportDF, reportPath, "mdoid","ConsumptionReport") + val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid") + val combinedReportDF = mdoReportDF.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid"))) + val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1) + generateReport(filteredCombinedReportDF, reportPath, "mdoid","ConsumptionReport") // to be removed once new security job is created if (conf.reportSyncEnable) { syncReports(s"${conf.localReportDir}/${reportPath}", reportPath) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/user/UserReportModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/user/UserReportModel.scala index c334329a..d855ae7d 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/user/UserReportModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/report/user/UserReportModel.scala @@ -22,6 +22,7 @@ object UserReportModel extends AbsDashboardModel { var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames() val orgHierarchyData = orgHierarchyDataframe() + val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF) var weeklyClapsDF = learnerStatsDataFrame() var karmaPointsDF = userKarmaPointsSummaryDataFrame() karmaPointsDF = karmaPointsDF.withColumnRenamed("userid", "userID") @@ -64,10 +65,13 @@ object UserReportModel extends AbsDashboardModel { .coalesce(1) val reportPath = s"${conf.userReportPath}/${today}" - // generateReport(fullReportDF, s"${reportPath}-full") + //generateReport(fullReportDF, s"${reportPath}-full") val mdoWiseReportDF = fullReportDF.drop("userID", "userOrgID", "userCreatedBy") - - generateReport(mdoWiseReportDF, reportPath,"mdoid", "UserReport") + val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid") + val combinedReportDF = mdoWiseReportDF.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid"))) + val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1) + // Repartition by mdo_id and write to CSV + generateReport(filteredCombinedReportDF, reportPath,"mdoid", "UserReport") // to be removed once new security job is created if (conf.reportSyncEnable) { syncReports(s"${conf.localReportDir}/${reportPath}", reportPath)