Skip to content

Commit

Permalink
Merge pull request #121 from varshamahuli97/cbrelease-4.8.16
Browse files Browse the repository at this point in the history
rolled up operational report modifications
  • Loading branch information
shishirsuman092 authored Aug 7, 2024
2 parents 127d7e4 + 73748ab commit 61b2959
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1883,5 +1883,67 @@ object DataUtil extends Serializable {
show(df, "ratings")
df
}
def processOrgsL3(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = {
val organisationDF = df.dropDuplicates()
val sumDF = organisationDF.withColumn("allIDs", lit(null).cast("string")).select(col("organisationID").alias("ministryID"), col("allIDs"))
sumDF

}

def processDepartmentL2(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = {
val organisationDF = df
.join(orgHierarchyCompleteDF, df("departmentMapID") === orgHierarchyCompleteDF("l2mapid"), "left")
.select(df("departmentID"), col("sborgid").alias("organisationID")).dropDuplicates()
val sumDF = organisationDF
.groupBy("departmentID")
.agg(
concat_ws(",", collect_set(when(col("organisationID").isNotNull, col("organisationID")))).alias("orgIDs")
)
.withColumn("associatedIds", concat_ws(",", col("orgIDs")))
.withColumn("allIDs", concat_ws(",", col("departmentID"), col("associatedIds")))
.select(col("departmentID").alias("ministryID"), col("allIDs"))
sumDF

}

def processMinistryL1(df: DataFrame, userOrgDF: DataFrame, orgHierarchyCompleteDF: DataFrame): DataFrame = {
println("Processing Ministry L1 DataFrame:")
val departmentAndMapIDsDF = df
.join(orgHierarchyCompleteDF, df("ministryMapID") === orgHierarchyCompleteDF("l1mapid"), "left")
.select(df("ministryID"), col("sborgid").alias("departmentID"), col("mapid").alias("departmentMapID"))

// Join with orgHierarchyCompleteDF to get the organisationDF
val organisationDF = departmentAndMapIDsDF
.join(orgHierarchyCompleteDF, departmentAndMapIDsDF("departmentMapID") === orgHierarchyCompleteDF("l2mapid"), "left")
.select(departmentAndMapIDsDF("ministryID"), departmentAndMapIDsDF("departmentID"),col("sborgid").alias("organisationID")).dropDuplicates()
val sumDF = organisationDF
.groupBy("ministryID")
.agg(
concat_ws(",", collect_set(when(col("departmentID").isNotNull, col("departmentID")))).alias("departmentIDs"),
concat_ws(",", collect_set(when(col("organisationID").isNotNull, col("organisationID")))).alias("orgIDs")
)
.withColumn("associatedIds", concat_ws(",", col("departmentIDs"), col("orgIDs")))
.withColumn("allIDs", concat_ws(",", col("ministryID"), col("associatedIds")))
.select(col("ministryID"), col("allIDs"))
sumDF
}

def getDetailedHierarchy(userOrgDF: DataFrame)(implicit spark: SparkSession, conf: DashboardConfig): DataFrame = {
var orgHierarchyCompleteDF = orgCompleteHierarchyDataFrame()
var distinctMdoIDsDF = userOrgDF.select("userOrgID").distinct()
val joinedDF = orgHierarchyCompleteDF.join(distinctMdoIDsDF, orgHierarchyCompleteDF("sborgid") === distinctMdoIDsDF("userOrgID"), "inner")
println("The number of distinct orgs in orgHierarchy is: "+joinedDF.count())
val ministryL1DF = joinedDF.filter(col("l1mapid").isNull && col("l2mapid").isNull && col("l3mapid").isNull).select(col("sborgid").alias("ministryID"), col("mapid").alias("ministryMapID"))
val ministryOrgDF = processMinistryL1(ministryL1DF, userOrgDF, orgHierarchyCompleteDF)
val departmentL2DF = joinedDF.filter(col("l2mapid").isNull && col("l1mapid").isNotNull || col("l3mapid").isNotNull).select(col("sborgid").alias("departmentID"), col("mapid").alias("departmentMapID"))
val deptOrgDF = processDepartmentL2(departmentL2DF, userOrgDF, orgHierarchyCompleteDF)
val orgsL3DF = joinedDF.filter((col("l3mapid").isNull) && col("l2mapid").isNotNull && col("l1mapid").isNotNull).select(col("sborgid").alias("organisationID"))
val orgsDF = processOrgsL3(orgsL3DF, userOrgDF, orgHierarchyCompleteDF)
val combinedMinistryMetricsDF = ministryOrgDF.union(deptOrgDF).union(orgsDF)
val updatedDF = combinedMinistryMetricsDF.withColumn("allIDs", when(col("allIDs").isNull || trim(col("allIDs")) === "", concat(lit("$"), col("ministryID"))).otherwise(col("allIDs")))
show(updatedDF, "This will be the final hierarchy")
updatedDF
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object UserACBPReportModel extends AbsDashboardModel {
// get user and org data frames
var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames()
val orgHierarchyData = orgHierarchyDataframe()

val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF)
val userDataDF = userOrgDF
.join(orgHierarchyData, Seq("userOrgID"), "left")
.withColumn("designation", coalesce(col("professionalDetails.designation"), lit("")))
Expand Down Expand Up @@ -113,7 +113,10 @@ object UserACBPReportModel extends AbsDashboardModel {

val reportPath = s"${conf.acbpReportPath}/${today}"
generateReport(enrolmentReportDF.drop("mdoid"), s"${reportPath}/CBPEnrollmentReport", fileName="CBPEnrollmentReport")
generateReport(enrolmentReportDF, s"${conf.acbpMdoEnrolmentReportPath}/${today}","mdoid", "CBPEnrollmentReport")
val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid")
val combinedReportDF = enrolmentReportDF.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid")))
val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1)
generateReport(filteredCombinedReportDF, s"${conf.acbpMdoEnrolmentReportPath}/${today}","mdoid", "CBPEnrollmentReport")
// to be removed once new security job is created
if (conf.reportSyncEnable) {
syncReports(s"${conf.localReportDir}/${reportPath}", s"${conf.acbpMdoEnrolmentReportPath}/${today}")
Expand Down Expand Up @@ -150,7 +153,10 @@ object UserACBPReportModel extends AbsDashboardModel {
)
show(userSummaryReportDF, "userSummaryReportDF")
generateReport(userSummaryReportDF.drop("mdoid"), s"${reportPath}/CBPUserSummaryReport", fileName="CBPUserSummaryReport")
generateReport(userSummaryReportDF.coalesce(1), s"${conf.acbpMdoSummaryReportPath}/${today}","mdoid", "CBPUserSummaryReport")
val explodedDF1 = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid")
val combinedReportDF1 = userSummaryReportDF.join(explodedDF1, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid")))
val filteredCombinedReportDF1 = combinedReportDF1.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1)
generateReport(filteredCombinedReportDF1.coalesce(1), s"${conf.acbpMdoSummaryReportPath}/${today}","mdoid", "CBPUserSummaryReport")
// to be removed once new security job is created
if(conf.reportSyncEnable) {
syncReports(s"${conf.localReportDir}/${reportPath}", s"${conf.acbpMdoSummaryReportPath}/${today}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object UserAssessmentModel extends AbsDashboardModel {

// obtain user org data
var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames()

val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF)
// get course details, with rating info
val (hierarchyDF, allCourseProgramDetailsWithCompDF, allCourseProgramDetailsDF,
allCourseProgramDetailsWithRatingDF) = contentDataFrames(orgDF)
Expand Down Expand Up @@ -94,7 +94,10 @@ object UserAssessmentModel extends AbsDashboardModel {
val reportPath = s"${conf.standaloneAssessmentReportPath}/${today}"
// generateReport(df, s"${reportPath}-full")
df = df.drop("assessID", "assessOrgID")
generateAndSyncReports(df, "mdoid", reportPath, "StandaloneAssessmentReport")
val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid")
val combinedReportDF = df.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid")))
val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1)
generateAndSyncReports(filteredCombinedReportDF, "mdoid", reportPath, "StandaloneAssessmentReport")

Redis.closeRedisConnect()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object BlendedProgramReportModel extends AbsDashboardModel {
// get user and user org data
var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames()
val orgHierarchyData = orgHierarchyDataframe()
val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF)

var userDataDF = userOrgDF
.withColumn("userPrimaryEmail", col("personalDetails.primaryEmail"))
Expand Down Expand Up @@ -276,7 +277,10 @@ object BlendedProgramReportModel extends AbsDashboardModel {
.withColumnRenamed("maskedPhone", "Phone_Number")

show(cbpReportDF, "cbpReportDF")
generateAndSyncReports(cbpReportDF, "mdoid", reportPathCBP, "BlendedProgramReport")
val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid")
val combinedReportDF = cbpReportDF.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid")))
val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1)
generateAndSyncReports(filteredCombinedReportDF, "mdoid", reportPathCBP, "BlendedProgramReport")


val df_warehouse = fullDF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object CourseBasedAssessmentModel extends AbsDashboardModel {
val today = getDate()

var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames()

val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF)
// get course details, with rating info
val (hierarchyDF, allCourseProgramDetailsWithCompDF, allCourseProgramDetailsDF,
allCourseProgramDetailsWithRatingDF) = contentDataFrames(orgDF, Seq("Course", "Program", "Blended Program", "Standalone Assessment", "Curated Program"), runValidation = false)
Expand Down Expand Up @@ -113,7 +113,10 @@ object CourseBasedAssessmentModel extends AbsDashboardModel {
// generateReport(fullReportDF, s"${reportPath}-full")

val mdoReportDF = fullReportDF.drop("assessID", "assessOrgID", "assessChildID", "userOrgID")
generateReport(mdoReportDF, reportPath,"mdoid", "UserAssessmentReport")
val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid")
val combinedReportDF = mdoReportDF.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid")))
val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1)
generateReport(filteredCombinedReportDF, reportPath,"mdoid", "UserAssessmentReport")
// to be removed once new security job is created
if (conf.reportSyncEnable) {
syncReports(s"${conf.localReportDir}/${reportPath}", reportPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ object CourseReportModel extends AbsDashboardModel {
val today = getDate()

val orgDF = orgDataFrame()

var (organisationDF, userDF, userOrgDF) = getOrgUserDataFrames()
val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF)
// Get course data first
val allCourseProgramDetailsDF = contentWithOrgDetailsDataFrame(orgDF, Seq("Course", "Program", "Blended Program", "CuratedCollections", "Curated Program"))
// Get content resource hierarchy
Expand Down Expand Up @@ -183,7 +184,10 @@ object CourseReportModel extends AbsDashboardModel {

// generateReport(fullReportDF, s"${reportPath}-full")
val mdoReportDF = fullReportDF.drop("courseID", "courseOrgID")
generateReport(mdoReportDF, reportPath,"mdoid", "ContentReport")
val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid")
val combinedReportDF = mdoReportDF.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid")))
val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1)
generateReport(filteredCombinedReportDF, reportPath,"mdoid", "ContentReport")
// to be removed once new security job is created
if (conf.reportSyncEnable) {
syncReports(s"${conf.localReportDir}/${reportPath}", reportPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object UserEnrolmentModel extends AbsDashboardModel {

//GET ORG DATA
var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames()
val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF)
val orgHierarchyData = orgHierarchyDataframe()
val userDataDF = userOrgDF
.join(orgHierarchyData, Seq("userOrgID"), "left")
Expand Down Expand Up @@ -142,7 +143,10 @@ object UserEnrolmentModel extends AbsDashboardModel {
// generateReport(fullReportDF, s"${reportPath}-full")

val mdoReportDF = fullReportDF.drop("userID", "userOrgID", "courseID", "courseOrgID", "issuedCertificateCount", "courseStatus", "resourceCount", "resourcesConsumed", "rawCompletionPercentage")
generateReport(mdoReportDF, reportPath, "mdoid","ConsumptionReport")
val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid")
val combinedReportDF = mdoReportDF.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid")))
val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1)
generateReport(filteredCombinedReportDF, reportPath, "mdoid","ConsumptionReport")
// to be removed once new security job is created
if (conf.reportSyncEnable) {
syncReports(s"${conf.localReportDir}/${reportPath}", reportPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ object UserReportModel extends AbsDashboardModel {
var (orgDF, userDF, userOrgDF) = getOrgUserDataFrames()

val orgHierarchyData = orgHierarchyDataframe()
val actualHierarchyDataFrame = getDetailedHierarchy(userOrgDF)
var weeklyClapsDF = learnerStatsDataFrame()
var karmaPointsDF = userKarmaPointsSummaryDataFrame()
karmaPointsDF = karmaPointsDF.withColumnRenamed("userid", "userID")
Expand Down Expand Up @@ -64,10 +65,13 @@ object UserReportModel extends AbsDashboardModel {
.coalesce(1)

val reportPath = s"${conf.userReportPath}/${today}"
// generateReport(fullReportDF, s"${reportPath}-full")
//generateReport(fullReportDF, s"${reportPath}-full")
val mdoWiseReportDF = fullReportDF.drop("userID", "userOrgID", "userCreatedBy")

generateReport(mdoWiseReportDF, reportPath,"mdoid", "UserReport")
val explodedDF = actualHierarchyDataFrame.withColumn("mdoid", explode(split(col("allIDs"), ","))).filter(trim(col("mdoid")) =!= "" && col("mdoid").isNotNull).drop("allIDs").dropDuplicates("mdoid")
val combinedReportDF = mdoWiseReportDF.join(explodedDF, Seq("mdoid"), "left").withColumn("ministryID", coalesce(col("ministryID"), col("mdoid")))
val filteredCombinedReportDF = combinedReportDF.drop("mdoid").withColumnRenamed("ministryID", "mdoid").coalesce(1)
// Repartition by mdo_id and write to CSV
generateReport(filteredCombinedReportDF, reportPath,"mdoid", "UserReport")
// to be removed once new security job is created
if (conf.reportSyncEnable) {
syncReports(s"${conf.localReportDir}/${reportPath}", reportPath)
Expand Down

0 comments on commit 61b2959

Please sign in to comment.