Skip to content

Commit

Permalink
Remove duplicates from NPS and modify monthly active users logic
Browse files Browse the repository at this point in the history
  • Loading branch information
varshamahuli97 committed May 19, 2024
1 parent c024bdb commit f7092ec
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ object NpsModel extends AbsDashboardModel {
val druidData2 = npsTriggerC2DataFrame() // gives data from druid for users who have either completed 1 course or have more than 30 telemetry events
val mongodbData = npsTriggerC3DataFrame() // gives the data from mongoDB for the users who have posted atleast 1 discussion

val df = druidData2.union(mongodbData)
var df = druidData2.union(mongodbData)
df = df.dropDuplicates("userid")
val druidData1Count = druidData1.count()
println(s"DataFrame Count for set of users who have submitted the form: $druidData1Count")
val druidData2Count = druidData2.count()
Expand All @@ -38,7 +39,8 @@ object NpsModel extends AbsDashboardModel {
val existingFeedCount = cassandraDF.count()
println(s"DataFrame Count for users who have feed data: $existingFeedCount")
val storeToCassandraDF = filteredDF.except(cassandraDF)
val filteredStoreToCassandraDF = storeToCassandraDF.filter(col("userid").isNotNull && col("userid") =!= "" && col("userid") =!= "''")
var filteredStoreToCassandraDF = storeToCassandraDF.filter(col("userid").isNotNull && col("userid") =!= "" && col("userid") =!= "''")
filteredStoreToCassandraDF = filteredStoreToCassandraDF.dropDuplicates("userid")
val finalFeedCount = filteredStoreToCassandraDF.count()
println(s"DataFrame Count for final set of users to create feed: $finalFeedCount")
//create an additional dataframe that has columns of user_feed table as we have to insert thes userIDS to user_feed table
Expand Down Expand Up @@ -67,7 +69,7 @@ object NpsModel extends AbsDashboardModel {
// write the dataframe to cassandra user_feed_backup table
additionalDF.write
.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "sunbird_notifications" , "table" -> "notification_feed_backup"))
.options(Map("keyspace" -> "sunbird_notifications" , "table" -> "notification_feed_history"))
.mode("append")
.save()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object SummaryRedisSyncModel extends AbsDashboardModel {
// (SELECT COUNT(DISTINCT(actor_id)) AS daily_count, TIME_FLOOR(__time + INTERVAL '05:30' HOUR TO MINUTE, 'P1D') AS day_start FROM \"telemetry-events-syncts\"
// WHERE eid='IMPRESSION' AND actor_type='User' AND __time > CURRENT_TIMESTAMP - INTERVAL '30' DAY GROUP BY 2)
val averageMonthlyActiveUsersDF = averageMonthlyActiveUsersDataFrame()
val averageMonthlyActiveUsersCount = averageMonthlyActiveUsersDF.groupBy().agg(expr("CASE WHEN COUNT(*) > 0 THEN CAST(AVG(DAUOutput) AS LONG) ELSE 0 END").alias("count")).first().getLong(0)
val averageMonthlyActiveUsersCount = averageMonthlyActiveUsersDF.select("activeCount").first().getLong(0)
Redis.update("lp_monthly_active_users", averageMonthlyActiveUsersCount.toString)
println(s"lp_monthly_active_users = ${averageMonthlyActiveUsersCount}")

Expand Down Expand Up @@ -88,11 +88,11 @@ object SummaryRedisSyncModel extends AbsDashboardModel {
}

def averageMonthlyActiveUsersDataFrame()(implicit spark: SparkSession, conf: DashboardConfig) : DataFrame = {
val query = """SELECT ROUND(AVG(daily_count * 1.0), 1) as DAUOutput FROM (SELECT COUNT(DISTINCT(actor_id)) AS daily_count, TIME_FLOOR(__time + INTERVAL '05:30' HOUR TO MINUTE, 'P1D') AS day_start FROM \"telemetry-events-syncts\" WHERE eid='IMPRESSION' AND actor_type='User' AND __time > CURRENT_TIMESTAMP - INTERVAL '30' DAY GROUP BY 2)"""
val query = """SELECT COUNT(DISTINCT(uid)) as activeCount FROM \"summary-events\" WHERE dimensions_type='app' AND __time > CURRENT_TIMESTAMP - INTERVAL '30' DAY"""
var df = druidDFOption(query, conf.sparkDruidRouterHost).orNull
if (df == null) return emptySchemaDataFrame(Schema.monthlyActiveUsersSchema)

df = df.withColumn("DAUOutput", expr("CAST(DAUOutput as LONG)")) // Important to cast as long otherwise a cast will fail later on
df = df.withColumn("activeCount", expr("CAST(activeCount as LONG)")) // Important to cast as long otherwise a cast will fail later on

show(df)
df
Expand Down

0 comments on commit f7092ec

Please sign in to comment.