Skip to content

Commit

Permalink
Merge pull request #118 from varshamahuli97/cbrelease-4.8.16
Browse files Browse the repository at this point in the history
Ministry level metrics and leaderboard on MDO channel : KB-5794
  • Loading branch information
shishirsuman092 authored Jul 26, 2024
2 parents 0c07cd6 + 2b099b1 commit 0344cc2
Show file tree
Hide file tree
Showing 9 changed files with 608 additions and 187 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ case class DashboardConfig (
cassandraKarmaPointsSummaryTable: String,
cassandraLearnerLeaderBoardTable: String,
cassandraLearnerLeaderBoardLookupTable: String,
cassandraMDOLearnerLeaderboardTable: String,

//warehouse tables;
appPostgresHost: String,
Expand Down Expand Up @@ -267,6 +268,7 @@ object DashboardConfigParser extends Serializable {
cassandraKarmaPointsSummaryTable = getConfigModelParam(config, "cassandraKarmaPointsSummaryTable"),
cassandraLearnerLeaderBoardTable = getConfigModelParam(config, "cassandraLearnerLeaderBoardTable"),
cassandraLearnerLeaderBoardLookupTable = getConfigModelParam(config, "cassandraLearnerLeaderBoardLookupTable"),
cassandraMDOLearnerLeaderboardTable = getConfigModelParam(config, "cassandraMDOLearnerLeaderboardTable"),

// redis keys
redisRegisteredOfficerCountKey = "mdo_registered_officer_count",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,11 @@ object DataUtil extends Serializable {
roleDF
}

def orgCompleteHierarchyDataFrame()(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = {
val orgCompleteHierarchyDF = cache.load("orgCompleteHierarchy")
orgCompleteHierarchyDF
}

/**
*
* @param userOrgDF DataFrame(userID, firstName, lastName, maskedEmail, userStatus, userOrgID, userOrgName, userOrgStatus)
Expand Down Expand Up @@ -1026,7 +1031,6 @@ object DataUtil extends Serializable {
.withColumn("courseEnrolledTimestamp", col("enrolled_date"))
.withColumn("lastContentAccessTimestamp", col("lastcontentaccesstime"))
.withColumn("issuedCertificateCount", size(col("issued_certificates")))
.withColumn("issuedCertificateCountPerContent", when(size(col("issued_certificates")) > 0, lit(1)).otherwise( lit(0)))
.withColumn("certificateGeneratedOn", when(col("issued_certificates").isNull, "").otherwise( col("issued_certificates")(size(col("issued_certificates")) - 1).getItem("lastIssuedOn")))
.withColumn("firstCompletedOn", when(col("issued_certificates").isNull, "").otherwise(when(size(col("issued_certificates")) > 0, col("issued_certificates")(0).getItem("lastIssuedOn")).otherwise("")))
.withColumn("certificateID", when(col("issued_certificates").isNull, "").otherwise( col("issued_certificates")(size(col("issued_certificates")) - 1).getItem("identifier")))
Expand Down Expand Up @@ -1538,7 +1542,7 @@ object DataUtil extends Serializable {
* Reading user_karma_points data
*/
def userKarmaPointsDataFrame()(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): DataFrame = {
val df = cassandraTableAsDataFrame(conf.cassandraUserKeyspace, conf.cassandraKarmaPointsTable)
val df = cache.load("userKarmaPoints")
show(df, "Karma Points data")
df
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ object DataExhaustModel extends AbsDashboardModel {
.repartition(16)
show(orgHierarchyDF, "orgHierarchyDF")
cache.write(orgHierarchyDF, "orgHierarchy")
show(orgPostgresDF, "orgCompleteHierarchyDF")
cache.write(orgPostgresDF, "orgCompleteHierarchy")

val userDF = cassandraTableAsDataFrame(conf.cassandraUserKeyspace, conf.cassandraUserTable)
show(userDF, "userDF")
Expand All @@ -108,8 +110,13 @@ object DataExhaustModel extends AbsDashboardModel {
show(learnerLeaderboardDF, "learnerLeaderboardDF")
cache.write(learnerLeaderboardDF, "learnerLeaderBoard")

val userKarmaPointsDF = cassandraTableAsDataFrame(conf.cassandraUserKeyspace, conf.cassandraKarmaPointsTable)
show(userKarmaPointsDF, "Karma Points data")
cache.write(userKarmaPointsDF, "userKarmaPoints")

val userKarmaPointsSummaryDF = cassandraTableAsDataFrame(conf.cassandraUserKeyspace, conf.cassandraKarmaPointsSummaryTable)
show(userKarmaPointsSummaryDF, "userKarmaPointsSummaryDF")
cache.write(userKarmaPointsSummaryDF, "userKarmaPointsSummary")
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.ekstep.analytics.dashboard.ministryleaderboard

import org.apache.spark.SparkContext
import org.ekstep.analytics.framework.util.JobLogger
import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobDriver}

object MinistryLearnerLeaderboardJob extends optional.Application with IJob{

implicit val className = "org.ekstep.analytics.dashboard.ministryleaderboard.MinistryLearnerLeaderboardJob"

override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = {
implicit val sparkContext: SparkContext = sc.getOrElse(null);
JobLogger.log("Started executing Job")
JobDriver.run("batch", config, MinistryLearnerLeaderboardModel)
JobLogger.log("Job Completed.")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package org.ekstep.analytics.dashboard.ministryleaderboard

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.ekstep.analytics.dashboard.DataUtil._
import org.ekstep.analytics.dashboard.DashboardUtil._
import org.ekstep.analytics.dashboard.{AbsDashboardModel, DashboardConfig}
import org.ekstep.analytics.framework.FrameworkContext


object MinistryLearnerLeaderboardModel extends AbsDashboardModel {

implicit val className: String = "org.ekstep.analytics.dashboard.leaderboard.ministryleaderboard.MinistryLearnerLeaderboardModel"

override def name() = "MinistryLearnerLeaderboardModel"

def processData(timestamp: Long)(implicit spark: SparkSession, sc: SparkContext, fc: FrameworkContext, conf: DashboardConfig): Unit = {


//get user and user-org data
var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames()
var orgHierarchyCompleteDF = orgCompleteHierarchyDataFrame()
var distinctMdoIDsDF = userOrgDF.select("userOrgID").distinct()
println("The number of distinct MDO ids is: "+ distinctMdoIDsDF.count())

val joinedDF = orgHierarchyCompleteDF.join(distinctMdoIDsDF, orgHierarchyCompleteDF("sborgid") === distinctMdoIDsDF("userOrgID"), "inner")
println("The number of distinct orgs in orgHierarchy is: "+joinedDF.count())
show(joinedDF, "orgHierarchyCompletedDF")

// get previous month start and end dates
val monthStart = date_format(date_trunc("MONTH", add_months(current_date(), -1)), "yyyy-MM-dd HH:mm:ss")
val monthEnd = date_format(last_day(add_months(current_date(), -1)), "yyyy-MM-dd 23:59:59")

//get previous month and year values
val (month, year) = (
date_format(date_add(last_day(add_months(current_date(), -1)), 1), "M"),
date_format(add_months(current_date(), -1), "yyyy")
)

//get karma points data and filter for specific month
val karmaPointsDataDF = userKarmaPointsDataFrame()
.filter(col("credit_date") >= monthStart && col("credit_date") <= monthEnd)
.groupBy(col("userid")).agg(sum(col("points")).alias("total_points"), max(col("credit_date")).alias("last_credit_date"))
show(karmaPointsDataDF, "this is the kp_data")

def processOrgsL3(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = {
val organisationDF = df.dropDuplicates()
val userOrgData = userOrgDF.join(organisationDF, userOrgDF("userOrgID") === organisationDF("organisationID"), "inner")
.select(col("userID"),col("organisationID").alias("userParentID"), col("professionalDetails.designation").alias("designation"),
col("userProfileImgUrl"), col("fullName"), col("userOrgName")).distinct()

userOrgData
}

def processDepartmentL2(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = {
val organisationDF = df
.join(orgHierarchyCompleteDF, df("departmentMapID") === orgHierarchyCompleteDF("l2mapid"), "inner")
.select(df("departmentID"), col("sborgid").alias("organisationID")).dropDuplicates()

val userOrgData = userOrgDF.join(organisationDF, (userOrgDF("userOrgID") === organisationDF("departmentID")) ||
(userOrgDF("userOrgID") === organisationDF("organisationID")), "inner")
.select(col("userID"),col("departmentID").alias("userParentID"), col("professionalDetails.designation").alias("designation"),
col("userProfileImgUrl"), col("fullName"), col("userOrgName")).distinct()

userOrgData
}

def processMinistryL1(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompletedDF: DataFrame): DataFrame = {
println("Processing Ministry L1 DataFrame:")
val departmentAndMapIDsDF = df
.join(orgHierarchyCompleteDF, df("ministryMapID") === orgHierarchyCompleteDF("l1mapid"), "left")
.select(df("ministryID"), col("sborgid").alias("departmentID"), col("mapid").alias("departmentMapID"))

// Join with orgHierarchyCompleteDF to get the organisationDF
val organisationDF = departmentAndMapIDsDF
.join(orgHierarchyCompleteDF, departmentAndMapIDsDF("departmentMapID") === orgHierarchyCompleteDF("l2mapid"), "left")
.select(departmentAndMapIDsDF("ministryID"), departmentAndMapIDsDF("departmentID"),col("sborgid").alias("organisationID")).dropDuplicates()

val userOrgData = userOrgDF.join(organisationDF, (userOrgDF("userOrgID") === organisationDF("ministryID")) ||
(userOrgDF("userOrgID") === organisationDF("departmentID")) || (userOrgDF("userOrgID") === organisationDF("organisationID")), "inner")
.select(col("userID"),col("ministryID").alias("userParentID"), col("professionalDetails.designation").alias("designation"),
col("userProfileImgUrl"), col("fullName"), col("userOrgName")).distinct()

userOrgData
}




//Create DataFrames based on conditions
val ministryL1DF = joinedDF.filter(col("sborgtype") === "ministry").select(col("sborgid").alias("ministryID"), col("mapid").alias("ministryMapID"))
show(ministryL1DF, "MinsitryData")
val ministryOrgDF = processMinistryL1(ministryL1DF, userOrgDF, orgHierarchyCompleteDF)
val departmentL2DF = joinedDF.filter(col("sborgtype") === "department").select(col("sborgid").alias("departmentID"), col("mapid").alias("departmentMapID"))
show(departmentL2DF, "DepartmentData")
val deptOrgDF = processDepartmentL2(departmentL2DF, userOrgDF, orgHierarchyCompleteDF)
val orgsL3DF = joinedDF.filter((col("sborgtype") === "mdo") && (col("sborgsubtype") !== "department")).select(col("sborgid").alias("organisationID"))
show(orgsL3DF, "OrgData")
val orgsDF = processOrgsL3(orgsL3DF, userOrgDF, orgHierarchyCompleteDF)

val userOrgData = ministryOrgDF.union(deptOrgDF).union(orgsDF)

var userLeaderBoardDataDF = userOrgData.join(karmaPointsDataDF, userOrgData("userID") === karmaPointsDataDF("userid"), "inner")
.filter(col("userParentID") =!= "")
.select(userOrgData("userID").alias("userid"),
userOrgData("userParentID").alias("org_id"),
userOrgData("fullName").alias("fullname"),
userOrgData("userProfileImgUrl").alias("profile_image"),
userOrgData("userOrgName").alias("org_name"),
userOrgData("designation"),
karmaPointsDataDF("total_points"),
karmaPointsDataDF("last_credit_date"))
.withColumn("month", (month - 1).cast("int"))
.withColumn("year", lit(year))

val windowSpecRank = Window.partitionBy("org_id").orderBy(desc("total_points"))
userLeaderBoardDataDF = userLeaderBoardDataDF.withColumn("rank", dense_rank().over(windowSpecRank))
userLeaderBoardDataDF.show(false)

// sort them based on their fullNames for each rank group within each org
val windowSpecRow = Window.partitionBy("org_id").orderBy(col("rank"), col("last_credit_date").desc)
userLeaderBoardDataDF = userLeaderBoardDataDF.withColumn("row_num", row_number.over(windowSpecRow))


// write to cassandra ministry leaner leaderboard
writeToCassandra(userLeaderBoardDataDF, conf.cassandraUserKeyspace, conf.cassandraMDOLearnerLeaderboardTable)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.ekstep.analytics.dashboard.ministrymetrics

import org.apache.spark.SparkContext
import org.ekstep.analytics.framework.util.JobLogger
import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobDriver}

object MinistryMetricsJob extends optional.Application with IJob{

implicit val className = "org.ekstep.analytics.dashboard.ministrymetrics.MinistryMetricsJob"

override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = {
implicit val sparkContext: SparkContext = sc.getOrElse(null);
JobLogger.log("Started executing Job")
JobDriver.run("batch", config, MinistryMetricsModel)
JobLogger.log("Job Completed.")
}
}
Loading

0 comments on commit 0344cc2

Please sign in to comment.