diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardSyncModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardSyncModel.scala index 125be4f3..04200cc0 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardSyncModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardSyncModel.scala @@ -55,15 +55,15 @@ object DashboardSyncModel extends AbsDashboardModel { val orgUserCountDF = orgUserCountDataFrame(orgDF, userDF) // validate activeOrgCount and orgUserCountDF count validate({orgUserCountDF.count()}, - {userOrgDF.filter(expr("userStatus=1 AND userOrgID IS NOT NULL AND userOrgStatus=1")).select("userOrgID").distinct().count()}, - "orgUserCountDF.count() should equal distinct active org count in userOrgDF") + {userOrgDF.filter(expr("userStatus=1 AND userOrgID IS NOT NULL AND userOrgStatus=1")).select("userOrgID").distinct().count()}, + "orgUserCountDF.count() should equal distinct active org count in userOrgDF") //obtain and save total karma points of each user val karmaPointsDataDF = userKarmaPointsDataFrame() - .groupBy(col("userid").alias("userID")).agg(sum(col("points")).alias("total_points")) + .groupBy(col("userid").alias("userID")).agg(sum(col("points")).alias("total_points")) val kPointsWithUserOrgDF = karmaPointsDataDF.join(userOrgDF, karmaPointsDataDF("userID") === userOrgDF("userID"), "inner") - .select(karmaPointsDataDF("*"), userOrgDF("fullName"), userOrgDF("userOrgID"), userOrgDF("userOrgName"), userOrgDF("professionalDetails.designation").alias("designation"), userOrgDF("userProfileImgUrl")) + .select(karmaPointsDataDF("*"), userOrgDF("fullName"), userOrgDF("userOrgID"), userOrgDF("userOrgName"), userOrgDF("professionalDetails.designation").alias("designation"), userOrgDF("userProfileImgUrl")) val (hierarchyDF, allCourseProgramDetailsWithCompDF, allCourseProgramDetailsDF, allCourseProgramDetailsWithRatingDF) = contentDataFrames(orgDF) @@ -95,12 +95,12 @@ object DashboardSyncModel extends AbsDashboardModel { // update redis key for top 10 learners in MDO channel val toJsonStringUDF = udf((userID: String, fullName: String, userOrgName: String, designation: String, userProfileImgUrl: String, total_points: Long, rank: Int) => { s"""{"userID":"$userID","fullName":"$fullName","userOrgName":"$userOrgName","designation":"$designation","userProfileImgUrl":"$userProfileImgUrl","total_points":$total_points,"rank":$rank}""" - }) + }) val windowSpec = Window.partitionBy("userOrgID").orderBy(col("total_points").desc) val rankedDF = kPointsWithUserOrgDF.withColumn("rank", rank().over(windowSpec)) val top10LearnersByMDODF = rankedDF.filter(col("rank") <= 10) val jsonStringDF = top10LearnersByMDODF.withColumn("json_details", toJsonStringUDF( - col("userID"), col("fullName"), col("userOrgName"), col("designation"), col("userProfileImgUrl"), col("total_points"), col("rank") + col("userID"), col("fullName"), col("userOrgName"), col("designation"), col("userProfileImgUrl"), col("total_points"), col("rank") )).groupBy("userOrgID").agg(collect_list(col("json_details")).as("top_learners")) val resultDF = jsonStringDF.select(col("userOrgID"), to_json(struct(col("top_learners"))).alias("top_learners")) @@ -116,10 +116,10 @@ object DashboardSyncModel extends AbsDashboardModel { cbpTop10Reviews(allCourseProgramDetailsWithRatingDF) Redis.closeRedisConnect() - } + } def dashboardRedisUpdates(orgRoleCount: DataFrame, userDF: DataFrame, allCourseProgramDetailsWithRatingDF: DataFrame, - allCourseProgramCompletionWithDetailsDF: DataFrame, allCourseProgramCompetencyDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { + allCourseProgramCompletionWithDetailsDF: DataFrame, allCourseProgramCompetencyDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { import spark.implicits._ // new redis updates - start // MDO onboarded, with atleast one MDO_ADMIN/MDO_LEADER @@ -134,12 +134,12 @@ object DashboardSyncModel extends AbsDashboardModel { // new users registered yesterday val usersRegisteredYesterdayDF = userDF - .withColumn("yesterdayStartTimestamp", date_trunc("day", date_sub(current_timestamp(), 1)).cast("long")) - .withColumn("todayStartTimestamp", date_trunc("day", current_timestamp()).cast("long")) + .withColumn("yesterdayStartTimestamp", date_trunc("day", date_sub(current_timestamp(), 1)).cast("long")) + .withColumn("todayStartTimestamp", date_trunc("day", current_timestamp()).cast("long")) show(usersRegisteredYesterdayDF, "usersRegisteredYesterdayDF") val usersRegisteredYesterdayCount = usersRegisteredYesterdayDF - .where(expr("userCreatedTimestamp >= yesterdayStartTimestamp AND userCreatedTimestamp < todayStartTimestamp and userStatus=1")) - .count() + .where(expr("userCreatedTimestamp >= yesterdayStartTimestamp AND userCreatedTimestamp < todayStartTimestamp and userStatus=1")) + .count() Redis.update("dashboard_new_users_registered_yesterday", usersRegisteredYesterdayCount.toString) println(s"dashboard_new_users_registered_yesterday = ${usersRegisteredYesterdayCount}") @@ -354,39 +354,61 @@ object DashboardSyncModel extends AbsDashboardModel { // get the count for each courseID val topContentCountDF = liveCourseProgramExcludingModeratedCompletedDF.groupBy("courseID").agg(count("*").alias("count")) - // Join count back to main DataFrame val liveCourseProgramExcludingModeratedCompletedWithCountDF = liveCourseProgramExcludingModeratedCompletedDF.join(topContentCountDF, "courseID") - // define a windowspec - val windowSpec = Window.partitionBy("courseOrgID").orderBy(col("count").desc) - - val topCoursesByCBPDF = liveCourseProgramExcludingModeratedCompletedWithCountDF - .filter($"category" === "Course") - .withColumn("sorted_courseIDs", concat_ws(",", collect_set($"courseID").over(windowSpec))) - .groupBy("courseOrgID") - .agg( - concat_ws(",", collect_set($"sorted_courseIDs")).alias("sorted_courseIDs"), - concat(col("courseOrgID"), lit(":courses")).alias("courseOrgID:content")) - .select("courseOrgID:content","sorted_courseIDs") - show(topCoursesByCBPDF, "coursesDF") - val topProgramsByCBPDF = liveCourseProgramExcludingModeratedCompletedWithCountDF - .filter($"category" === "Program") - .withColumn("sorted_courseIDs", concat_ws(",", collect_set($"courseID").over(windowSpec))) - .groupBy("courseOrgID") - .agg( - concat_ws(",", collect_set($"sorted_courseIDs")).alias("sorted_courseIDs"), - concat(col("courseOrgID"), lit(":programs")).alias("courseOrgID:content")) - .select("courseOrgID:content","sorted_courseIDs") - - val topAssessmentsByCBPDF = liveCourseProgramExcludingModeratedCompletedWithCountDF - .filter($"category" === "Standalone Assessment") - .withColumn("sorted_courseIDs", concat_ws(",", collect_set($"courseID").over(windowSpec))) - .groupBy("courseOrgID") - .agg(concat_ws(",", collect_set($"sorted_courseIDs")).alias("sorted_courseIDs"), - concat(col("courseOrgID"), lit(":assessments")).alias("courseOrgID:content") - ) - .select("courseOrgID:content","sorted_courseIDs") - val combinedDFByCBP = topCoursesByCBPDF.union(topProgramsByCBPDF).union(topAssessmentsByCBPDF) + val topCoursesByCBPDF = liveCourseProgramExcludingModeratedCompletedDF + .filter($"category" === "Course") + val ecount1DF = topCoursesByCBPDF.groupBy("courseOrgID", "courseID") + .agg(countDistinct("userID").as("user_enrolment_count")) + val windowSpec1 = Window.partitionBy("courseOrgID").orderBy(col("user_enrolment_count").desc) + val sortedDF1 = ecount1DF + .groupBy("courseOrgID") + .agg( + collect_list("courseID").as("courseIDs"), + first("user_enrolment_count").as("user_enrolment_count") + ) + .withColumn("sorted_courseIDs", concat_ws(",", col("courseIDs"))) + .select( + concat(col("courseOrgID"), lit(":courses")).alias("courseOrgID:content"), + col("sorted_courseIDs") + ) + + + val topProgramsByCBPDF = liveCourseProgramExcludingModeratedCompletedDF + .filter($"category" === "Program") + val ecount2DF = topProgramsByCBPDF.groupBy("courseOrgID", "courseID") + .agg(countDistinct("userID").as("user_enrolment_count")) + val windowSpec2 = Window.partitionBy("courseOrgID").orderBy(col("user_enrolment_count").desc) + val sortedDF2 = ecount2DF + .groupBy("courseOrgID") + .agg( + collect_list("courseID").as("courseIDs"), + first("user_enrolment_count").as("user_enrolment_count") + ) + .withColumn("sorted_courseIDs", concat_ws(",", col("courseIDs"))) + .select( + concat(col("courseOrgID"), lit(":programs")).alias("courseOrgID:content"), + col("sorted_courseIDs") + ) + + + val topAssessmentsByCBPDF = liveCourseProgramExcludingModeratedCompletedDF + .filter($"category" === "Standalone Assessment") + val ecount3DF = topAssessmentsByCBPDF.groupBy("courseOrgID", "courseID") + .agg(countDistinct("userID").as("user_enrolment_count")) + val windowSpec3 = Window.partitionBy("courseOrgID").orderBy(col("user_enrolment_count").desc) + val sortedDF3 = ecount3DF + .groupBy("courseOrgID") + .agg( + collect_list("courseID").as("courseIDs"), + first("user_enrolment_count").as("user_enrolment_count") + ) + .withColumn("sorted_courseIDs", concat_ws(",", col("courseIDs"))) + .select( + concat(col("courseOrgID"), lit(":assessments")).alias("courseOrgID:content"), + col("sorted_courseIDs") + ) + val combinedDFByCBP = sortedDF1.union(sortedDF2).union(sortedDF3) Redis.dispatchDataFrame[String]("dashboard_top_10_courses_by_completion_by_course_org", combinedDFByCBP, "courseOrgID:content", "sorted_courseIDs") @@ -437,7 +459,7 @@ object DashboardSyncModel extends AbsDashboardModel { if (usersLoggedInLast24HrsByMDODF == null) { print("Empty dataframe: usersLoggedInLast24HrsByMDODF") - } + } else { usersLoggedInLast24HrsByMDODF = usersLoggedInLast24HrsByMDODF.withColumn("activeCount", expr("CAST(activeCount as LONG)")) @@ -447,7 +469,7 @@ object DashboardSyncModel extends AbsDashboardModel { // Selecting required columns val loginPercentLast24HrsbyMDODF = loginPercentbyMDODF.select("userOrgID", "loginPercentage") Redis.dispatchDataFrame[Long]("dashboard_login_percent_last_24_hrs_by_user_org", loginPercentLast24HrsbyMDODF, "userOrgID", "loginPercentage") - } + } val certificateGeneratedByMDODF = certificateGeneratedDF.groupBy("userOrgID").agg(count("*").alias("count"), countDistinct("userID").alias("uniqueUserCount")) Redis.dispatchDataFrame[Long]("dashboard_certificates_generated_count_by_user_org", certificateGeneratedByMDODF, "userOrgID", "count") @@ -464,13 +486,13 @@ object DashboardSyncModel extends AbsDashboardModel { // FROM \"dashboards-user-course-program-progress\" WHERE __time = (SELECT MAX(__time) FROM \"dashboards-user-course-program-progress\") // AND userStatus=1 AND category='Course' AND courseStatus IN ('Live', 'Retired') AND dbCompletionStatus=2 $mdo$ GROUP BY 1, 2, 3 ORDER BY completed_count DESC LIMIT 5 val top5UsersByCompletionByMdoDF = liveRetiredCourseCompletedDF - .groupBy("userID", "fullName", "maskedEmail", "userOrgID", "userOrgName") - .agg(count("courseID").alias("completedCount")) - .groupByLimit(Seq("userOrgID"), "completedCount", 5, desc = true) - .withColumn("jsonData", struct("rowNum", "userID", "fullName", "maskedEmail", "userOrgID", "userOrgName", "completedCount")) - .orderBy(col("completedCount").desc) - .groupBy("userOrgID") - .agg(to_json(collect_list("jsonData")).alias("jsonData")) + .groupBy("userID", "fullName", "maskedEmail", "userOrgID", "userOrgName") + .agg(count("courseID").alias("completedCount")) + .groupByLimit(Seq("userOrgID"), "completedCount", 5, desc = true) + .withColumn("jsonData", struct("rowNum", "userID", "fullName", "maskedEmail", "userOrgID", "userOrgName", "completedCount")) + .orderBy(col("completedCount").desc) + .groupBy("userOrgID") + .agg(to_json(collect_list("jsonData")).alias("jsonData")) show(top5UsersByCompletionByMdoDF, "top5UsersByCompletionByMdoDF") Redis.dispatchDataFrame[String]("dashboard_top_5_users_by_completion_by_org", top5UsersByCompletionByMdoDF, "userOrgID", "jsonData") @@ -484,18 +506,18 @@ object DashboardSyncModel extends AbsDashboardModel { // AND userStatus=1 AND category='Course' AND courseStatus IN ('Live', 'Retired') $mdo$ // GROUP BY 1, 2, 3, 4 ORDER BY \"Course Completions\" DESC LIMIT 5 val top5CoursesByCompletionByMdoDF = liveRetiredCourseEnrolmentDF - .groupBy("courseID", "courseName", "userOrgID", "userOrgName") - .agg( - count("userID").alias("enrolledCount"), - expr("SUM(CASE WHEN dbCompletionStatus=0 THEN 1 ELSE 0 END)").alias("notStartedCount"), - expr("SUM(CASE WHEN dbCompletionStatus=1 THEN 1 ELSE 0 END)").alias("inProgressCount"), - expr("SUM(CASE WHEN dbCompletionStatus=2 THEN 1 ELSE 0 END)").alias("completedCount") - ) - .groupByLimit(Seq("userOrgID"), "completedCount", 5, desc = true) - .withColumn("jsonData", struct("rowNum", "courseID", "courseName", "userOrgID", "userOrgName", "enrolledCount", "notStartedCount", "inProgressCount", "completedCount")) - .orderBy(col("completedCount").desc) - .groupBy("userOrgID") - .agg(to_json(collect_list("jsonData")).alias("jsonData")) + .groupBy("courseID", "courseName", "userOrgID", "userOrgName") + .agg( + count("userID").alias("enrolledCount"), + expr("SUM(CASE WHEN dbCompletionStatus=0 THEN 1 ELSE 0 END)").alias("notStartedCount"), + expr("SUM(CASE WHEN dbCompletionStatus=1 THEN 1 ELSE 0 END)").alias("inProgressCount"), + expr("SUM(CASE WHEN dbCompletionStatus=2 THEN 1 ELSE 0 END)").alias("completedCount") + ) + .groupByLimit(Seq("userOrgID"), "completedCount", 5, desc = true) + .withColumn("jsonData", struct("rowNum", "courseID", "courseName", "userOrgID", "userOrgName", "enrolledCount", "notStartedCount", "inProgressCount", "completedCount")) + .orderBy(col("completedCount").desc) + .groupBy("userOrgID") + .agg(to_json(collect_list("jsonData")).alias("jsonData")) show(top5CoursesByCompletionByMdoDF, "top5CoursesByCompletionByMdoDF") Redis.dispatchDataFrame[String]("dashboard_top_5_courses_by_completion_by_org", top5CoursesByCompletionByMdoDF, "userOrgID", "jsonData") @@ -505,17 +527,17 @@ object DashboardSyncModel extends AbsDashboardModel { // AND ratingCount>0 AND ratingAverage<=5.0 AND category='Course' AND courseStatus='Live' // GROUP BY 1, 2, 3, 4 ORDER BY rating_count * rating_avg DESC LIMIT 5 val top5CoursesByRatingDF = ratedLiveCourseDF - .where(expr("ratingCount>0 AND ratingAverage<=5.0")) - .withColumn("ratingMetric", expr("ratingCount * ratingAverage")) - .orderBy(col("ratingMetric").desc) - .limit(5) - .select( - col("courseID"), - col("courseName"), - col("courseOrgName"), - round(col("ratingAverage"), 1).alias("ratingAverage"), - col("ratingCount") - ) + .where(expr("ratingCount>0 AND ratingAverage<=5.0")) + .withColumn("ratingMetric", expr("ratingCount * ratingAverage")) + .orderBy(col("ratingMetric").desc) + .limit(5) + .select( + col("courseID"), + col("courseName"), + col("courseOrgName"), + round(col("ratingAverage"), 1).alias("ratingAverage"), + col("ratingCount") + ) show(top5CoursesByRatingDF, "top5CoursesByRatingDF") val top5CoursesByRatingJson = top5CoursesByRatingDF.toJSON.collectAsList().toString println(top5CoursesByRatingJson) @@ -527,15 +549,15 @@ object DashboardSyncModel extends AbsDashboardModel { // AND userStatus=1 AND category='Course' AND courseStatus IN ('Live', 'Retired') AND dbCompletionStatus=2 // GROUP BY 1, 2 ORDER BY completed_count DESC LIMIT 5 val top5MdoByCompletionDF = liveRetiredCourseCompletedDF - .groupBy("userOrgID", "userOrgName") - .agg(count("courseID").alias("completedCount")) - .orderBy(col("completedCount").desc) - .limit(5) - .select( - col("userOrgID"), - col("userOrgName"), - col("completedCount") - ) + .groupBy("userOrgID", "userOrgName") + .agg(count("courseID").alias("completedCount")) + .orderBy(col("completedCount").desc) + .limit(5) + .select( + col("userOrgID"), + col("userOrgName"), + col("completedCount") + ) show(top5MdoByCompletionDF, "top5MdoByCompletionDF") val top5MdoByCompletionJson = top5MdoByCompletionDF.toJSON.collectAsList().toString println(top5MdoByCompletionJson) @@ -547,38 +569,38 @@ object DashboardSyncModel extends AbsDashboardModel { // AND category='Course' AND courseStatus='Live' // GROUP BY 1, 2 ORDER BY published_count DESC LIMIT 5 val top5MdoByLiveCoursesDF = liveCourseDF - .groupBy("courseOrgID", "courseOrgName") - .agg(count("courseID").alias("publishedCount")) - .orderBy(col("publishedCount").desc) - .limit(5) - .select( - col("courseOrgID"), - col("courseOrgName"), - col("publishedCount") - ) + .groupBy("courseOrgID", "courseOrgName") + .agg(count("courseID").alias("publishedCount")) + .orderBy(col("publishedCount").desc) + .limit(5) + .select( + col("courseOrgID"), + col("courseOrgName"), + col("publishedCount") + ) show(top5MdoByLiveCoursesDF, "top5MdoByLiveCoursesDF") val top5MdoByLiveCoursesJson = top5MdoByLiveCoursesDF.toJSON.collectAsList().toString println(top5MdoByLiveCoursesJson) Redis.update("dashboard_top_5_mdo_by_live_courses", top5MdoByLiveCoursesJson) // new redis updates - end - } + } def averageMonthlyActiveUsersDataFrame()(implicit spark: SparkSession, conf: DashboardConfig) : Long = { - val query = """SELECT ROUND(AVG(daily_count * 1.0), 0) as DAUOutput FROM (SELECT COUNT(DISTINCT(actor_id)) AS daily_count, TIME_FLOOR(__time + INTERVAL '05:30' HOUR TO MINUTE, 'P1D') AS day_start FROM \"telemetry-events-syncts\" WHERE eid='IMPRESSION' AND actor_type='User' AND __time > CURRENT_TIMESTAMP - INTERVAL '30' DAY GROUP BY 2)""" + val query = """SELECT ROUND(AVG(daily_count * 1.0), 1) as DAUOutput FROM (SELECT COUNT(DISTINCT(actor_id)) AS daily_count, TIME_FLOOR(__time + INTERVAL '05:30' HOUR TO MINUTE, 'P1D') AS day_start FROM \"telemetry-events-syncts\" WHERE eid='IMPRESSION' AND actor_type='User' AND __time > CURRENT_TIMESTAMP - INTERVAL '30' DAY GROUP BY 2)""" var df = druidDFOption(query, conf.sparkDruidRouterHost).orNull var averageMonthlyActiveUserCount = 0L if (df == null || df.isEmpty) return averageMonthlyActiveUserCount else { - df = df.withColumn("DAUOutput", expr("CAST(DAUOutput as LONG)")) - df = df.withColumn("DAUOutput", col("DAUOutput").cast("long")) - averageMonthlyActiveUserCount = df.select("DAUOutput").first().getLong(0) - } + df = df.withColumn("DAUOutput", expr("CAST(DAUOutput as LONG)")) + df = df.withColumn("DAUOutput", col("DAUOutput").cast("long")) + averageMonthlyActiveUserCount = df.select("DAUOutput").first().getLong(0) + } println("======222222222222=========") println(averageMonthlyActiveUserCount) averageMonthlyActiveUserCount - } + } def updateLearnerHomePageData(orgDF: DataFrame, userOrgDF: DataFrame, userCourseProgramCompletionDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { @@ -597,47 +619,47 @@ object DashboardSyncModel extends AbsDashboardModel { if(!lastRunDate.equals(currentDateString)) { learnerHPRedisCalculations(cbpCompletionWithDetailsDF, cbpDetailsWithRatingDF) - } else { + } else { print("This is a second run today and the computation and redis key updates are not required") - } + } print("the current date is :" + currentDateString + "\n") Redis.update("lhp_lastRunDate", currentDateString) - } + } def learningHoursDiff(learningHoursTillDay0: DataFrame, learningHoursTillDay1: DataFrame, defaultLearningHours: DataFrame, prefix: String): DataFrame = { if (learningHoursTillDay0.isEmpty) { return defaultLearningHours - } + } learningHoursTillDay1 - .withColumnRenamed("totalLearningHours", "learningHoursTillDay1") - .join(learningHoursTillDay0.withColumnRenamed("totalLearningHours", "learningHoursTillDay0"), Seq("userOrgID"), "left") - .na.fill(0.0, Seq("learningHoursTillDay0", "learningHoursTillDay1")) - .withColumn("totalLearningHours", expr("learningHoursTillDay1 - learningHoursTillDay0")) - .withColumn(s"userOrgID", concat(col("userOrgID"), lit(s":${prefix}"))) - .select(s"userOrgID", "totalLearningHours") - } + .withColumnRenamed("totalLearningHours", "learningHoursTillDay1") + .join(learningHoursTillDay0.withColumnRenamed("totalLearningHours", "learningHoursTillDay0"), Seq("userOrgID"), "left") + .na.fill(0.0, Seq("learningHoursTillDay0", "learningHoursTillDay1")) + .withColumn("totalLearningHours", expr("learningHoursTillDay1 - learningHoursTillDay0")) + .withColumn(s"userOrgID", concat(col("userOrgID"), lit(s":${prefix}"))) + .select(s"userOrgID", "totalLearningHours") + } def processLearningHours(courseProgramCompletionWithDetailsDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { // The courseDuration is coming in seconds from ES, so converting it to hours. Also courseProgress is number of leaf nodes // consumed and we should look at completion percentage as the % of learning hours val totalLearningHoursTillTodayByOrg = courseProgramCompletionWithDetailsDF - .filter("userOrgID IS NOT NULL AND TRIM(userOrgID) != ''") - .groupBy("userOrgID") - .agg(sum(expr("(completionPercentage / 100) * courseDuration")).alias("totalLearningSeconds")) - .withColumn("totalLearningHours", col("totalLearningSeconds") / 3600) - .drop("totalLearningSeconds") + .filter("userOrgID IS NOT NULL AND TRIM(userOrgID) != ''") + .groupBy("userOrgID") + .agg(sum(expr("(completionPercentage / 100) * courseDuration")).alias("totalLearningSeconds")) + .withColumn("totalLearningHours", col("totalLearningSeconds") / 3600) + .drop("totalLearningSeconds") show(totalLearningHoursTillTodayByOrg, "totalLearningHoursTillTodayByOrg") val totalLearningHoursTillYesterdayByOrg = Redis.getMapAsDataFrame("lhp_learningHoursTillToday", Schema.totalLearningHoursSchema) - .withColumn("totalLearningHours", col("totalLearningHours").cast(DoubleType)) + .withColumn("totalLearningHours", col("totalLearningHours").cast(DoubleType)) show(totalLearningHoursTillYesterdayByOrg, "totalLearningHoursTillYesterdayByOrg") val totalLearningHoursTillDayBeforeYesterdayByOrg = Redis.getMapAsDataFrame("lhp_learningHoursTillYesterday", Schema.totalLearningHoursSchema) - .withColumn("totalLearningHours", col("totalLearningHours").cast(DoubleType)) + .withColumn("totalLearningHours", col("totalLearningHours").cast(DoubleType)) show(totalLearningHoursTillDayBeforeYesterdayByOrg, "totalLearningHoursTillDayBeforeYesterdayByOrg") //one issue with the learningHoursDiff returning the totalLearningHoursTillTodayByOrg as default if the 1st input DF is @@ -663,11 +685,11 @@ object DashboardSyncModel extends AbsDashboardModel { Redis.updateMapField("lhp_learningHours", "across:yesterday", totalLearningHoursYesterday) println("learning across today :" + totalLearningHoursToday) Redis.updateMapField("lhp_learningHours", "across:today", totalLearningHoursToday) - } + } def processCertifications(courseProgramCompletionWithDetailsDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { val totalCertificationsTillToday = courseProgramCompletionWithDetailsDF - .where(expr("courseStatus IN ('Live') AND userStatus=1 AND dbCompletionStatus = 2 AND issuedCertificateCount > 0")).count() + .where(expr("courseStatus IN ('Live') AND userStatus=1 AND dbCompletionStatus = 2 AND issuedCertificateCount > 0")).count() val totalCertificationsTillYesterdayStr = Redis.get("lhp_certificationsTillToday") val totalCertificationsTillYesterday = if (totalCertificationsTillYesterdayStr == "") { 0L } else { totalCertificationsTillYesterdayStr.toLong } @@ -685,13 +707,13 @@ object DashboardSyncModel extends AbsDashboardModel { val startOf7thDay = currentDayStart.minusDays(7).getMillis / 1000 val certificationsOfTheWeek = courseProgramCompletionWithDetailsDF - .where(expr(s"courseStatus IN ('Live') AND userStatus=1 AND courseCompletedTimestamp > '${startOf7thDay}' AND courseCompletedTimestamp < '${endOfCurrentDay}' AND dbCompletionStatus = 2 AND issuedCertificateCount > 0")) + .where(expr(s"courseStatus IN ('Live') AND userStatus=1 AND courseCompletedTimestamp > '${startOf7thDay}' AND courseCompletedTimestamp < '${endOfCurrentDay}' AND dbCompletionStatus = 2 AND issuedCertificateCount > 0")) val topNCertifications = certificationsOfTheWeek - .groupBy("courseID") - .agg(count("*").alias("courseCount")) - .orderBy(desc("courseCount")) - .limit(10) + .groupBy("courseID") + .agg(count("*").alias("courseCount")) + .orderBy(desc("courseCount")) + .limit(10) println("certifications till today :" + totalCertificationsTillToday) Redis.update("lhp_certificationsTillToday", totalCertificationsTillToday.toString) @@ -703,81 +725,81 @@ object DashboardSyncModel extends AbsDashboardModel { Redis.updateMapField("lhp_certifications", "across:today", totalCertificationsToday.toString) val courseIdsString = topNCertifications - .agg(concat_ws(",", collect_list("courseID"))).first().getString(0) + .agg(concat_ws(",", collect_list("courseID"))).first().getString(0) print("trending certifications :" +courseIdsString + "\n") Redis.updateMapField("lhp_trending", "across:certifications", courseIdsString) val topNCertificationsByMDO = certificationsOfTheWeek - .groupBy("userOrgID", "courseID") - .agg(count("*").alias("courseCount")) - .orderBy(desc("courseCount")) - .groupBy("userOrgID") - .agg(concat_ws(",", collect_list("courseID")).alias("certifications")) - .withColumn("userOrgID:certifications", concat(col("userOrgID"), lit(":certifications"))) - .limit(10) + .groupBy("userOrgID", "courseID") + .agg(count("*").alias("courseCount")) + .orderBy(desc("courseCount")) + .groupBy("userOrgID") + .agg(concat_ws(",", collect_list("courseID")).alias("certifications")) + .withColumn("userOrgID:certifications", concat(col("userOrgID"), lit(":certifications"))) + .limit(10) show(topNCertificationsByMDO, "topNCertificationsByMDO") Redis.dispatchDataFrame[String]("lhp_trending", topNCertificationsByMDO, "userOrgID:certifications", "certifications", replace = false) - } + } def processTrending(allCourseProgramCompletionWithDetailsDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { val trendingCourses = allCourseProgramCompletionWithDetailsDF - .filter("dbCompletionStatus IN (0, 1, 2) AND courseStatus = 'Live' AND category = 'Course'") - .groupBy("courseID") - .agg(count("*").alias("enrollmentCount")) - .orderBy(desc("enrollmentCount")) + .filter("dbCompletionStatus IN (0, 1, 2) AND courseStatus = 'Live' AND category = 'Course'") + .groupBy("courseID") + .agg(count("*").alias("enrollmentCount")) + .orderBy(desc("enrollmentCount")) val totalCourseCount = trendingCourses.count() val courseLimitCount = (totalCourseCount * 0.10).toInt val trendingCourseIdsString = trendingCourses.limit(courseLimitCount) - .agg(concat_ws(",", collect_list("courseID"))).first().getString(0) + .agg(concat_ws(",", collect_list("courseID"))).first().getString(0) val trendingPrograms = allCourseProgramCompletionWithDetailsDF - .filter("dbCompletionStatus IN (0, 1, 2) AND courseStatus = 'Live' AND category IN ('Blended Program', 'Curated Program')") - .groupBy("courseID") - .agg(count("*").alias("enrollmentCount")) - .orderBy(desc("enrollmentCount")) + .filter("dbCompletionStatus IN (0, 1, 2) AND courseStatus = 'Live' AND category IN ('Blended Program', 'Curated Program')") + .groupBy("courseID") + .agg(count("*").alias("enrollmentCount")) + .orderBy(desc("enrollmentCount")) val totalProgramCount = trendingPrograms.count() val programLimitCount = (totalProgramCount * 0.10).toInt val trendingProgramIdsString = trendingPrograms.limit(programLimitCount) - .agg(concat_ws(",", collect_list("courseID"))).first().getString(0) + .agg(concat_ws(",", collect_list("courseID"))).first().getString(0) val trendingCoursesByOrg = allCourseProgramCompletionWithDetailsDF - .filter("dbCompletionStatus IN (0, 1, 2) AND courseStatus = 'Live' AND category = 'Course'") - .groupBy("userOrgID", "courseID") - .agg(count("*").alias("enrollmentCount")) - .groupByLimit(Seq("userOrgID"), "enrollmentCount", 50, desc = true) - .drop("enrollmentCount", "rowNum") + .filter("dbCompletionStatus IN (0, 1, 2) AND courseStatus = 'Live' AND category = 'Course'") + .groupBy("userOrgID", "courseID") + .agg(count("*").alias("enrollmentCount")) + .groupByLimit(Seq("userOrgID"), "enrollmentCount", 50, desc = true) + .drop("enrollmentCount", "rowNum") val trendingCoursesListByOrg = trendingCoursesByOrg - .groupBy("userOrgID") - .agg(collect_list("courseID").alias("courseIds")) - .withColumn("userOrgID:courses", expr("userOrgID")) - .withColumn("trendingCourseList", concat_ws(",", col("courseIds"))) - .withColumn("userOrgID:courses", concat(col("userOrgID:courses"), lit(":courses"))) - .select("userOrgID:courses", "trendingCourseList") - .filter(col("userOrgID:courses").isNotNull && col("userOrgID:courses") =!= "") + .groupBy("userOrgID") + .agg(collect_list("courseID").alias("courseIds")) + .withColumn("userOrgID:courses", expr("userOrgID")) + .withColumn("trendingCourseList", concat_ws(",", col("courseIds"))) + .withColumn("userOrgID:courses", concat(col("userOrgID:courses"), lit(":courses"))) + .select("userOrgID:courses", "trendingCourseList") + .filter(col("userOrgID:courses").isNotNull && col("userOrgID:courses") =!= "") val trendingProgramsByOrg = allCourseProgramCompletionWithDetailsDF - .filter("dbCompletionStatus IN (0, 1, 2) AND courseStatus = 'Live' AND category IN ('Blended Program', 'Curated Program')") - .groupBy("userOrgID", "courseID") - .agg(count("*").alias("enrollmentCount")) - .groupByLimit(Seq("userOrgID"), "enrollmentCount", 50, desc = true) - .drop("enrollmentCount", "rowNum") + .filter("dbCompletionStatus IN (0, 1, 2) AND courseStatus = 'Live' AND category IN ('Blended Program', 'Curated Program')") + .groupBy("userOrgID", "courseID") + .agg(count("*").alias("enrollmentCount")) + .groupByLimit(Seq("userOrgID"), "enrollmentCount", 50, desc = true) + .drop("enrollmentCount", "rowNum") val trendingProgramsListByOrg = trendingProgramsByOrg - .groupBy("userOrgID") - .agg(collect_list("courseID").alias("courseIds")) - .withColumn("userOrgID:programs", expr("userOrgID")) - .withColumn("trendingProgramList", concat_ws(",", col("courseIds"))) - .withColumn("userOrgID:programs", concat(col("userOrgID:programs"), lit(":programs"))) - .select("userOrgID:programs", "trendingProgramList") - .filter(col("userOrgID:programs").isNotNull && col("userOrgID:programs") =!= "") + .groupBy("userOrgID") + .agg(collect_list("courseID").alias("courseIds")) + .withColumn("userOrgID:programs", expr("userOrgID")) + .withColumn("trendingProgramList", concat_ws(",", col("courseIds"))) + .withColumn("userOrgID:programs", concat(col("userOrgID:programs"), lit(":programs"))) + .select("userOrgID:programs", "trendingProgramList") + .filter(col("userOrgID:programs").isNotNull && col("userOrgID:programs") =!= "") val mostEnrolledTag = trendingCourses.limit(courseLimitCount) - .agg(concat_ws(",", collect_list("courseID"))).first().getString(0) + .agg(concat_ws(",", collect_list("courseID"))).first().getString(0) print("trending courses :" +trendingCourseIdsString + "\n") Redis.updateMapField("lhp_trending", "across:courses", trendingCourseIdsString) @@ -790,7 +812,7 @@ object DashboardSyncModel extends AbsDashboardModel { // print("most enrolled tag :" + mostEnrolledTag) Redis.update("lhp_mostEnrolledTag", mostEnrolledTag + "\n") - } + } def learnerHPRedisCalculations(cbpCompletionWithDetailsDF: DataFrame, cbpDetailsWithRatingDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { val courseProgramCompletionWithDetailsDF = cbpCompletionWithDetailsDF.where(expr("category IN ('Course', 'Program')")) @@ -799,7 +821,7 @@ object DashboardSyncModel extends AbsDashboardModel { val cbpsUnder30minsDf = cbpDetailsWithRatingDF.where(expr("courseStatus='Live' and courseDuration < 1800 AND category IN ('Course', 'Program')") && !col("courseID").endsWith("_rc")).orderBy(desc("ratingAverage")) show(cbpsUnder30minsDf, "cbpsUnder30minsDf") val coursesUnder30mins = cbpsUnder30minsDf - .agg(concat_ws(",", collect_list("courseID"))).first().getString(0) + .agg(concat_ws(",", collect_list("courseID"))).first().getString(0) Redis.updateMapField("lhp_trending", "across:under_30_mins", coursesUnder30mins) // calculate and save learning hours to redis @@ -810,23 +832,23 @@ object DashboardSyncModel extends AbsDashboardModel { // calculate and save trending data processTrending(cbpCompletionWithDetailsDF.where(expr("category != 'Program'"))) - } + } def cbpTop10Reviews(allCourseProgramDetailsWithRatingDF: DataFrame)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { // get rating table DF var ratingDf = getRatings() // filter records by courseID and rating greater than 4.5 ratingDf = ratingDf - .join(allCourseProgramDetailsWithRatingDF, col("activityid").equalTo(col("courseID")), "inner") - .filter(col("review").isNotNull && col("rating").>=("4.5")) + .join(allCourseProgramDetailsWithRatingDF, col("activityid").equalTo(col("courseID")), "inner") + .filter(col("review").isNotNull && col("rating").>=("4.5")) ratingDf = ratingDf.select( - col("activityid").alias("courseID"), - col("courseOrgID"), - col("activitytype"), - col("rating"), - col("userid").alias("userID"), - col("review") + col("activityid").alias("courseID"), + col("courseOrgID"), + col("activitytype"), + col("rating"), + col("userid").alias("userID"), + col("review") ).orderBy(col("courseOrgID")) show(ratingDf) @@ -840,15 +862,15 @@ object DashboardSyncModel extends AbsDashboardModel { // create JSON data for top 10 reviews by orgID val reviewDF = top10PerOrg - .groupByLimit(Seq("courseOrgID"), "rank", 10, desc = true) - .withColumn("jsonData", struct("courseID", "userID", "rating", "review")) - .groupBy("courseOrgID") - .agg(to_json(collect_list("jsonData")).alias("jsonData")) + .groupByLimit(Seq("courseOrgID"), "rank", 10, desc = true) + .withColumn("jsonData", struct("courseID", "userID", "rating", "review")) + .groupBy("courseOrgID") + .agg(to_json(collect_list("jsonData")).alias("jsonData")) // write to redis Redis.dispatchDataFrame[String]("cbp_top_10_users_reviews_by_org", reviewDF, "courseOrgID", "jsonData") - } + } diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardUtil.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardUtil.scala index 0283fc47..a995688f 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardUtil.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DashboardUtil.scala @@ -78,6 +78,7 @@ case class DashboardConfig ( cassandraKarmaPointsSummaryTable: String, cassandraLearnerLeaderBoardTable: String, cassandraLearnerLeaderBoardLookupTable: String, + cassandraMDOLearnerLeaderboardTable: String, //warehouse tables; appPostgresHost: String, @@ -267,6 +268,7 @@ object DashboardConfigParser extends Serializable { cassandraKarmaPointsSummaryTable = getConfigModelParam(config, "cassandraKarmaPointsSummaryTable"), cassandraLearnerLeaderBoardTable = getConfigModelParam(config, "cassandraLearnerLeaderBoardTable"), cassandraLearnerLeaderBoardLookupTable = getConfigModelParam(config, "cassandraLearnerLeaderBoardLookupTable"), + cassandraMDOLearnerLeaderboardTable = getConfigModelParam(config, "cassandraMDOLearnerLeaderboardTable"), // redis keys redisRegisteredOfficerCountKey = "mdo_registered_officer_count", diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala index 0599033d..1c88d0ae 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/DataUtil.scala @@ -455,6 +455,11 @@ object DataUtil extends Serializable { roleDF } + def orgCompleteHierarchyDataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = { + val orgCompleteHierarchyDF = cache.load("orgCompleteHierarchy") + orgCompleteHierarchyDF + } + /** * * @param userOrgDF DataFrame(userID, firstName, lastName, maskedEmail, userStatus, userOrgID, userOrgName, userOrgStatus) @@ -1026,7 +1031,6 @@ object DataUtil extends Serializable { .withColumn("courseEnrolledTimestamp", col("enrolled_date")) .withColumn("lastContentAccessTimestamp", col("lastcontentaccesstime")) .withColumn("issuedCertificateCount", size(col("issued_certificates"))) - .withColumn("issuedCertificateCountPerContent", when(size(col("issued_certificates")) > 0, lit(1)).otherwise( lit(0))) .withColumn("certificateGeneratedOn", when(col("issued_certificates").isNull, "").otherwise( col("issued_certificates")(size(col("issued_certificates")) - 1).getItem("lastIssuedOn"))) .withColumn("firstCompletedOn", when(col("issued_certificates").isNull, "").otherwise(when(size(col("issued_certificates")) > 0, col("issued_certificates")(0).getItem("lastIssuedOn")).otherwise(""))) .withColumn("certificateID", when(col("issued_certificates").isNull, "").otherwise( col("issued_certificates")(size(col("issued_certificates")) - 1).getItem("identifier"))) @@ -1538,7 +1542,7 @@ object DataUtil extends Serializable { * Reading user_karma_points data */ def userKarmaPointsDataFrame()(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): DataFrame = { - val df = cassandraTableAsDataFrame(conf.cassandraUserKeyspace, conf.cassandraKarmaPointsTable) + val df = cache.load("userKarmaPoints") show(df, "Karma Points data") df } diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/exhaust/DataExhaustModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/exhaust/DataExhaustModel.scala index 55ea1794..49d2582e 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/exhaust/DataExhaustModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/exhaust/DataExhaustModel.scala @@ -99,6 +99,8 @@ object DataExhaustModel extends AbsDashboardModel { .repartition(16) show(orgHierarchyDF, "orgHierarchyDF") cache.write(orgHierarchyDF, "orgHierarchy") + show(orgPostgresDF, "orgCompleteHierarchyDF") + cache.write(orgPostgresDF, "orgCompleteHierarchy") val userDF = cassandraTableAsDataFrame(conf.cassandraUserKeyspace, conf.cassandraUserTable) show(userDF, "userDF") @@ -108,8 +110,13 @@ object DataExhaustModel extends AbsDashboardModel { show(learnerLeaderboardDF, "learnerLeaderboardDF") cache.write(learnerLeaderboardDF, "learnerLeaderBoard") + val userKarmaPointsDF = cassandraTableAsDataFrame(conf.cassandraUserKeyspace, conf.cassandraKarmaPointsTable) + show(userKarmaPointsDF, "Karma Points data") + cache.write(userKarmaPointsDF, "userKarmaPoints") + val userKarmaPointsSummaryDF = cassandraTableAsDataFrame(conf.cassandraUserKeyspace, conf.cassandraKarmaPointsSummaryTable) show(userKarmaPointsSummaryDF, "userKarmaPointsSummaryDF") cache.write(userKarmaPointsSummaryDF, "userKarmaPointsSummary") } } + diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministryleaderboard/MinistryLearnerLeaderboardJob.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministryleaderboard/MinistryLearnerLeaderboardJob.scala new file mode 100644 index 00000000..3cfb490e --- /dev/null +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministryleaderboard/MinistryLearnerLeaderboardJob.scala @@ -0,0 +1,17 @@ +package org.ekstep.analytics.dashboard.ministryleaderboard + +import org.apache.spark.SparkContext +import org.ekstep.analytics.framework.util.JobLogger +import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobDriver} + +object MinistryLearnerLeaderboardJob extends optional.Application with IJob{ + + implicit val className = "org.ekstep.analytics.dashboard.ministryleaderboard.MinistryLearnerLeaderboardJob" + + override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = { + implicit val sparkContext: SparkContext = sc.getOrElse(null); + JobLogger.log("Started executing Job") + JobDriver.run("batch", config, MinistryLearnerLeaderboardModel) + JobLogger.log("Job Completed.") + } +} \ No newline at end of file diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministryleaderboard/MinistryLearnerLeaderboardModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministryleaderboard/MinistryLearnerLeaderboardModel.scala new file mode 100644 index 00000000..f6f54b01 --- /dev/null +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministryleaderboard/MinistryLearnerLeaderboardModel.scala @@ -0,0 +1,131 @@ +package org.ekstep.analytics.dashboard.ministryleaderboard + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.ekstep.analytics.dashboard.DataUtil._ +import org.ekstep.analytics.dashboard.DashboardUtil._ +import org.ekstep.analytics.dashboard.{AbsDashboardModel, DashboardConfig} +import org.ekstep.analytics.framework.FrameworkContext + + +object MinistryLearnerLeaderboardModel extends AbsDashboardModel { + + implicit val className: String = "org.ekstep.analytics.dashboard.leaderboard.ministryleaderboard.MinistryLearnerLeaderboardModel" + + override def name() = "MinistryLearnerLeaderboardModel" + + def processData(timestamp: Long)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { + + + //get user and user-org data + var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames() + var orgHierarchyCompleteDF = orgCompleteHierarchyDataFrame() + var distinctMdoIDsDF = userOrgDF.select("userOrgID").distinct() + println("The number of distinct MDO ids is: "+ distinctMdoIDsDF.count()) + + val joinedDF = orgHierarchyCompleteDF.join(distinctMdoIDsDF, orgHierarchyCompleteDF("sborgid") === distinctMdoIDsDF("userOrgID"), "inner") + println("The number of distinct orgs in orgHierarchy is: "+joinedDF.count()) + show(joinedDF, "orgHierarchyCompletedDF") + + // get previous month start and end dates + val monthStart = date_format(date_trunc("MONTH", add_months(current_date(), -1)), "yyyy-MM-dd HH:mm:ss") + val monthEnd = date_format(last_day(add_months(current_date(), -1)), "yyyy-MM-dd 23:59:59") + + //get previous month and year values + val (month, year) = ( + date_format(date_add(last_day(add_months(current_date(), -1)), 1), "M"), + date_format(add_months(current_date(), -1), "yyyy") + ) + + //get karma points data and filter for specific month + val karmaPointsDataDF = userKarmaPointsDataFrame() + .filter(col("credit_date") >= monthStart && col("credit_date") <= monthEnd) + .groupBy(col("userid")).agg(sum(col("points")).alias("total_points"), max(col("credit_date")).alias("last_credit_date")) + show(karmaPointsDataDF, "this is the kp_data") + + def processOrgsL3(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = { + val organisationDF = df.dropDuplicates() + val userOrgData = userOrgDF.join(organisationDF, userOrgDF("userOrgID") === organisationDF("organisationID"), "inner") + .select(col("userID"),col("organisationID").alias("userParentID"), col("professionalDetails.designation").alias("designation"), + col("userProfileImgUrl"), col("fullName"), col("userOrgName")).distinct() + + userOrgData + } + + def processDepartmentL2(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = { + val organisationDF = df + .join(orgHierarchyCompleteDF, df("departmentMapID") === orgHierarchyCompleteDF("l2mapid"), "inner") + .select(df("departmentID"), col("sborgid").alias("organisationID")).dropDuplicates() + + val userOrgData = userOrgDF.join(organisationDF, (userOrgDF("userOrgID") === organisationDF("departmentID")) || + (userOrgDF("userOrgID") === organisationDF("organisationID")), "inner") + .select(col("userID"),col("departmentID").alias("userParentID"), col("professionalDetails.designation").alias("designation"), + col("userProfileImgUrl"), col("fullName"), col("userOrgName")).distinct() + + userOrgData + } + + def processMinistryL1(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompletedDF: DataFrame): DataFrame = { + println("Processing Ministry L1 DataFrame:") + val departmentAndMapIDsDF = df + .join(orgHierarchyCompleteDF, df("ministryMapID") === orgHierarchyCompleteDF("l1mapid"), "left") + .select(df("ministryID"), col("sborgid").alias("departmentID"), col("mapid").alias("departmentMapID")) + + // Join with orgHierarchyCompleteDF to get the organisationDF + val organisationDF = departmentAndMapIDsDF + .join(orgHierarchyCompleteDF, departmentAndMapIDsDF("departmentMapID") === orgHierarchyCompleteDF("l2mapid"), "left") + .select(departmentAndMapIDsDF("ministryID"), departmentAndMapIDsDF("departmentID"),col("sborgid").alias("organisationID")).dropDuplicates() + + val userOrgData = userOrgDF.join(organisationDF, (userOrgDF("userOrgID") === organisationDF("ministryID")) || + (userOrgDF("userOrgID") === organisationDF("departmentID")) || (userOrgDF("userOrgID") === organisationDF("organisationID")), "inner") + .select(col("userID"),col("ministryID").alias("userParentID"), col("professionalDetails.designation").alias("designation"), + col("userProfileImgUrl"), col("fullName"), col("userOrgName")).distinct() + + userOrgData + } + + + + + //Create DataFrames based on conditions + val ministryL1DF = joinedDF.filter(col("sborgtype") === "ministry").select(col("sborgid").alias("ministryID"), col("mapid").alias("ministryMapID")) + show(ministryL1DF, "MinsitryData") + val ministryOrgDF = processMinistryL1(ministryL1DF, userOrgDF, orgHierarchyCompleteDF) + val departmentL2DF = joinedDF.filter(col("sborgtype") === "department").select(col("sborgid").alias("departmentID"), col("mapid").alias("departmentMapID")) + show(departmentL2DF, "DepartmentData") + val deptOrgDF = processDepartmentL2(departmentL2DF, userOrgDF, orgHierarchyCompleteDF) + val orgsL3DF = joinedDF.filter((col("sborgtype") === "mdo") && (col("sborgsubtype") !== "department")).select(col("sborgid").alias("organisationID")) + show(orgsL3DF, "OrgData") + val orgsDF = processOrgsL3(orgsL3DF, userOrgDF, orgHierarchyCompleteDF) + + val userOrgData = ministryOrgDF.union(deptOrgDF).union(orgsDF) + + var userLeaderBoardDataDF = userOrgData.join(karmaPointsDataDF, userOrgData("userID") === karmaPointsDataDF("userid"), "inner") + .filter(col("userParentID") =!= "") + .select(userOrgData("userID").alias("userid"), + userOrgData("userParentID").alias("org_id"), + userOrgData("fullName").alias("fullname"), + userOrgData("userProfileImgUrl").alias("profile_image"), + userOrgData("userOrgName").alias("org_name"), + userOrgData("designation"), + karmaPointsDataDF("total_points"), + karmaPointsDataDF("last_credit_date")) + .withColumn("month", (month - 1).cast("int")) + .withColumn("year", lit(year)) + + val windowSpecRank = Window.partitionBy("org_id").orderBy(desc("total_points")) + userLeaderBoardDataDF = userLeaderBoardDataDF.withColumn("rank", dense_rank().over(windowSpecRank)) + userLeaderBoardDataDF.show(false) + + // sort them based on their fullNames for each rank group within each org + val windowSpecRow = Window.partitionBy("org_id").orderBy(col("rank"), col("last_credit_date").desc) + userLeaderBoardDataDF = userLeaderBoardDataDF.withColumn("row_num", row_number.over(windowSpecRow)) + + + // write to cassandra ministry leaner leaderboard + writeToCassandra(userLeaderBoardDataDF, conf.cassandraUserKeyspace, conf.cassandraMDOLearnerLeaderboardTable) + } +} \ No newline at end of file diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministrymetrics/MinistryMetricsJob.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministrymetrics/MinistryMetricsJob.scala new file mode 100644 index 00000000..cb459eb2 --- /dev/null +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministrymetrics/MinistryMetricsJob.scala @@ -0,0 +1,17 @@ +package org.ekstep.analytics.dashboard.ministrymetrics + +import org.apache.spark.SparkContext +import org.ekstep.analytics.framework.util.JobLogger +import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobDriver} + +object MinistryMetricsJob extends optional.Application with IJob{ + + implicit val className = "org.ekstep.analytics.dashboard.ministrymetrics.MinistryMetricsJob" + + override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = { + implicit val sparkContext: SparkContext = sc.getOrElse(null); + JobLogger.log("Started executing Job") + JobDriver.run("batch", config, MinistryMetricsModel) + JobLogger.log("Job Completed.") + } +} \ No newline at end of file diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministrymetrics/MinistryMetricsModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministrymetrics/MinistryMetricsModel.scala new file mode 100644 index 00000000..0c559647 --- /dev/null +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/ministrymetrics/MinistryMetricsModel.scala @@ -0,0 +1,217 @@ +package org.ekstep.analytics.dashboard.ministrymetrics + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.ekstep.analytics.dashboard.DataUtil._ +import org.ekstep.analytics.dashboard.DashboardUtil._ +import org.ekstep.analytics.dashboard.{AbsDashboardModel, DashboardConfig} +import org.ekstep.analytics.framework.FrameworkContext + + +object MinistryMetricsModel extends AbsDashboardModel { + + implicit val className: String = "org.ekstep.analytics.dashboard.MinistryMetricsModel" + + override def name() = "MinistryMetricsModel" + + def processData(timestamp: Long)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = { + + + //get user and user-org data + var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames() + var orgHierarchyCompleteDF = orgCompleteHierarchyDataFrame() + var distinctMdoIDsDF = userOrgDF.select("userOrgID").distinct() + println("The number of distinct MDO ids is: "+ distinctMdoIDsDF.count()) + + val joinedDF = orgHierarchyCompleteDF.join(distinctMdoIDsDF, orgHierarchyCompleteDF("sborgid") === distinctMdoIDsDF("userOrgID"), "inner") + println("The number of distinct orgs in orgHierarchy is: "+joinedDF.count()) + show(joinedDF, "orgHierarchyCompletedDF") + + + val userSumDF = Redis.getMapAsDataFrame("dashboard_user_count_by_user_org", Schema.totalLearningHoursSchema) + val userLoginpercentDF = Redis.getMapAsDataFrame("dashboard_login_percent_last_24_hrs_by_user_org", Schema.totalLearningHoursSchema) + val certSumDF = Redis.getMapAsDataFrame("dashboard_certificates_generated_count_by_user_org", Schema.totalLearningHoursSchema) + val enrolmentDF = Redis.getMapAsDataFrame("dashboard_enrolment_content_by_user_org", Schema.totalLearningHoursSchema) + + + def processOrgsL3(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = { + + val organisationDF = df.dropDuplicates() + val sumDF = organisationDF + + val userJoinedDF = sumDF.join(userSumDF, $"userOrgID" === $"organisationID", "left_outer") + .groupBy("organisationID") + .agg(sum($"totalLearningHours").alias("learningSumValue")) + + val loginJoinedDF = userJoinedDF + .join(userLoginpercentDF, $"userOrgID" === $"organisationID", "left_outer") + .groupBy("organisationID", "learningSumValue") + .agg(sum($"totalLearningHours").alias("loginSumValue")) + + + // Join loginJoinedDF with certSumDF for certSumValue + val certJoinedDF = loginJoinedDF + .join(certSumDF, $"userOrgID" === $"organisationID", "left_outer") + .groupBy("organisationID", "learningSumValue", "loginSumValue") + .agg(sum($"totalLearningHours").alias("certSumValue")) + + // Join certJoinedDF with enrolmentSumDF for enrolmentSumValue + val finalResultDF = certJoinedDF + .join(enrolmentDF, $"userOrgID" === $"organisationID", "left_outer") + .groupBy("organisationID","learningSumValue", "loginSumValue", "certSumValue") + .agg(sum($"totalLearningHours").alias("enrolmentSumValue")) + .withColumn("allIDs", lit(null).cast("string")) + .select(col("organisationID").alias("ministryID"), col("allIDs"), col("learningSumValue"), col("loginSumValue"), col("certSumValue"), col("enrolmentSumValue")) + + show(finalResultDF, "finalresult") + + val finaldf2 = finalResultDF + .withColumn("learningSumValue", col("learningSumValue").cast("int")) + .withColumn("loginSumValue", coalesce(col("loginSumValue").cast("int"), lit(0))) + .withColumn("certSumValue", coalesce(col("certSumValue").cast("int"), lit(0))) + .withColumn("enrolmentSumValue", coalesce(col("enrolmentSumValue").cast("int"),lit(0))) + finaldf2 + + + + } + + def processDepartmentL2(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = { + val organisationDF = df + .join(orgHierarchyCompleteDF, df("departmentMapID") === orgHierarchyCompleteDF("l2mapid"), "left") + .select(df("departmentID"), col("sborgid").alias("organisationID")).dropDuplicates() + + val sumDF = organisationDF + .groupBy("departmentID") + .agg( + concat_ws(",", collect_set(when($"organisationID".isNotNull, $"organisationID"))).alias("orgIDs") + ) + .withColumn("associatedIds", concat_ws(",", $"orgIDs")) + .withColumn("allIDs", concat_ws(",", $"departmentID", $"associatedIds")) + + val userJoinedDF = sumDF.withColumn("orgID", explode(split($"allIDs", ","))) + .join(userSumDF, $"userOrgID" === $"orgID", "left_outer") + .groupBy("departmentID", "allIDs") + .agg(sum($"totalLearningHours").alias("learningSumValue")) + + val loginJoinedDF = userJoinedDF + .withColumn("orgID", explode(split($"allIDs", ","))) + .join(userLoginpercentDF, $"userOrgID" === $"orgID", "left_outer") + .groupBy("departmentID", "allIDs", "learningSumValue") + .agg(sum($"totalLearningHours").alias("loginSumValue")) + + // Join loginJoinedDF with certSumDF for certSumValue + val certJoinedDF = loginJoinedDF + .withColumn("orgID", explode(split($"allIDs", ","))) + .join(certSumDF, $"userOrgID" === $"orgID", "left_outer") + .groupBy("departmentID", "allIDs", "learningSumValue", "loginSumValue") + .agg(sum($"totalLearningHours").alias("certSumValue")) + + // Join certJoinedDF with enrolmentSumDF for enrolmentSumValue + val finalResultDF = certJoinedDF + .withColumn("orgID", explode(split($"allIDs", ","))) + .join(enrolmentDF, $"userOrgID" === $"orgID", "left_outer") + .groupBy("departmentID", "allIDs", "learningSumValue", "loginSumValue", "certSumValue") + .agg(sum($"totalLearningHours").alias("enrolmentSumValue")) + .select(col("departmentID").alias("ministryID"), col("allIDs"), col("learningSumValue"), col("loginSumValue"), col("certSumValue"), col("enrolmentSumValue")) + show(finalResultDF, "finalresult") + + val finaldf2 = finalResultDF + .withColumn("learningSumValue", col("learningSumValue").cast("int")) + .withColumn("loginSumValue", coalesce(col("loginSumValue").cast("int"), lit(0))) + .withColumn("certSumValue", coalesce(col("certSumValue").cast("int"), lit(0))) + .withColumn("enrolmentSumValue", coalesce(col("enrolmentSumValue").cast("int"),lit(0))) + + finaldf2 + + } + + def processMinistryL1(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = { + + println("Processing Ministry L1 DataFrame:") + val departmentAndMapIDsDF = df + .join(orgHierarchyCompleteDF, df("ministryMapID") === orgHierarchyCompleteDF("l1mapid"), "left") + .select(df("ministryID"), col("sborgid").alias("departmentID"), col("mapid").alias("departmentMapID")) + + // Join with orgHierarchyCompleteDF to get the organisationDF + val organisationDF = departmentAndMapIDsDF + .join(orgHierarchyCompleteDF, departmentAndMapIDsDF("departmentMapID") === orgHierarchyCompleteDF("l2mapid"), "left") + .select(departmentAndMapIDsDF("ministryID"), departmentAndMapIDsDF("departmentID"),col("sborgid").alias("organisationID")).dropDuplicates() + show(organisationDF, "hierarchyF") + + + val sumDF = organisationDF + .groupBy("ministryID") + .agg( + concat_ws(",", collect_set(when($"departmentID".isNotNull, $"departmentID"))).alias("departmentIDs"), + concat_ws(",", collect_set(when($"organisationID".isNotNull, $"organisationID"))).alias("orgIDs") + ) + .withColumn("associatedIds", concat_ws(",", $"departmentIDs", $"orgIDs")) + .withColumn("allIDs", concat_ws(",", $"ministryID", $"associatedIds")) + + + val userJoinedDF = sumDF.withColumn("orgID", explode(split($"allIDs", ","))) + .join(userSumDF, $"userOrgID" === $"orgID", "left_outer") + .groupBy("ministryID", "allIDs") + .agg(sum($"totalLearningHours").alias("learningSumValue")) + + val loginJoinedDF = userJoinedDF + .withColumn("orgID", explode(split($"allIDs", ","))) + .join(userLoginpercentDF, $"userOrgID" === $"orgID", "left_outer") + .groupBy("ministryID", "allIDs", "learningSumValue") + .agg(sum($"totalLearningHours").alias("loginSumValue")) + + // Join loginJoinedDF with certSumDF for certSumValue + val certJoinedDF = loginJoinedDF + .withColumn("orgID", explode(split($"allIDs", ","))) + .join(certSumDF, $"userOrgID" === $"orgID", "left_outer") + .groupBy("ministryID", "allIDs", "learningSumValue", "loginSumValue") + .agg(sum($"totalLearningHours").alias("certSumValue")) + + // Join certJoinedDF with enrolmentSumDF for enrolmentSumValue + val finalResultDF = certJoinedDF + .withColumn("orgID", explode(split($"allIDs", ","))) + .join(enrolmentDF, $"userOrgID" === $"orgID", "left_outer") + .groupBy("ministryID", "allIDs", "learningSumValue", "loginSumValue", "certSumValue") + .agg(sum($"totalLearningHours").alias("enrolmentSumValue")) + .select(col("ministryID"), col("allIDs"), col("learningSumValue"), col("loginSumValue"), col("certSumValue"), col("enrolmentSumValue")) + show(finalResultDF, "finalresult") + + val finaldf2 = finalResultDF + .withColumn("learningSumValue", col("learningSumValue").cast("int")) + .withColumn("loginSumValue", coalesce(col("loginSumValue").cast("int"), lit(0))) + .withColumn("certSumValue", coalesce(col("certSumValue").cast("int"), lit(0))) + .withColumn("enrolmentSumValue", coalesce(col("enrolmentSumValue").cast("int"),lit(0))) + + finaldf2 + } + + + //Create DataFrames based on conditions + val ministryL1DF = joinedDF.filter(col("sborgtype") === "ministry").select(col("sborgid").alias("ministryID"), col("mapid").alias("ministryMapID")) + show(ministryL1DF, "MinsitryData") + val ministryOrgDF = processMinistryL1(ministryL1DF, userOrgDF, orgHierarchyCompleteDF) + val departmentL2DF = joinedDF.filter((col("sborgtype") === "department") || (col("sborgsubtype") === "department")).select(col("sborgid").alias("departmentID"), col("mapid").alias("departmentMapID")) + show(departmentL2DF, "DepartmentData") + val deptOrgDF = processDepartmentL2(departmentL2DF, userOrgDF, orgHierarchyCompleteDF) + val orgsL3DF = joinedDF.filter((col("sborgtype") === "mdo") && (col("sborgsubtype") !== "department")).select(col("sborgid").alias("organisationID")) + show(orgsL3DF, "OrgData") + val orgsDF = processOrgsL3(orgsL3DF, userOrgDF, orgHierarchyCompleteDF) + + var combinedMinistryMetricsDF = ministryOrgDF.union(deptOrgDF).union(orgsDF) + + show(combinedMinistryMetricsDF, "MinistryMetrics") + + Redis.dispatchDataFrame[Int]("dashboard_rolled_up_user_count", combinedMinistryMetricsDF, "ministryID", "learningSumValue") + + Redis.dispatchDataFrame[Int]("dashboard_rolled_up_login_percent_last_24_hrs", combinedMinistryMetricsDF, "ministryID", "loginSumValue") + + Redis.dispatchDataFrame[Double]("dashboard_rolled_up_certificates_generated_count", combinedMinistryMetricsDF, "ministryID", "certSumValue") + + Redis.dispatchDataFrame[Double]("dashboard_rolled_up_enrolment_content_count", combinedMinistryMetricsDF, "ministryID", "enrolmentSumValue") + + } +} \ No newline at end of file diff --git a/batch-models/src/main/scala/org/ekstep/analytics/job/JobFactory.scala b/batch-models/src/main/scala/org/ekstep/analytics/job/JobFactory.scala index ac9c21e6..2249fac0 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/job/JobFactory.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/job/JobFactory.scala @@ -110,6 +110,10 @@ object JobFactory { ObservationQuestionReportJob case "observation-status-report" => ObservationStatusReportJob + case "ministry-metrics" => + MinistryMetricsJob + case "ministry-learner-leaderboard" => + MinistryLearnerLeaderboardJob case _ => reflectModule(jobType); }