From f7092ec6f780ed53d81db3805f911199bd7ed62d Mon Sep 17 00:00:00 2001 From: Varsha Mahuli Date: Sun, 19 May 2024 10:24:57 +0530 Subject: [PATCH] Remove duplicates from NPS and modify monthly active users logic --- .../ekstep/analytics/dashboard/survey/nps/NpsModel.scala | 8 +++++--- .../dashboard/telemetry/SummaryRedisSyncModel.scala | 6 +++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/survey/nps/NpsModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/survey/nps/NpsModel.scala index 31056753..1faefa68 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/survey/nps/NpsModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/survey/nps/NpsModel.scala @@ -20,7 +20,8 @@ object NpsModel extends AbsDashboardModel { val druidData2 = npsTriggerC2DataFrame() // gives data from druid for users who have either completed 1 course or have more than 30 telemetry events val mongodbData = npsTriggerC3DataFrame() // gives the data from mongoDB for the users who have posted atleast 1 discussion - val df = druidData2.union(mongodbData) + var df = druidData2.union(mongodbData) + df = df.dropDuplicates("userid") val druidData1Count = druidData1.count() println(s"DataFrame Count for set of users who have submitted the form: $druidData1Count") val druidData2Count = druidData2.count() @@ -38,7 +39,8 @@ object NpsModel extends AbsDashboardModel { val existingFeedCount = cassandraDF.count() println(s"DataFrame Count for users who have feed data: $existingFeedCount") val storeToCassandraDF = filteredDF.except(cassandraDF) - val filteredStoreToCassandraDF = storeToCassandraDF.filter(col("userid").isNotNull && col("userid") =!= "" && col("userid") =!= "''") + var filteredStoreToCassandraDF = storeToCassandraDF.filter(col("userid").isNotNull && col("userid") =!= "" && col("userid") =!= "''") + filteredStoreToCassandraDF = filteredStoreToCassandraDF.dropDuplicates("userid") val finalFeedCount = filteredStoreToCassandraDF.count() println(s"DataFrame Count for final set of users to create feed: $finalFeedCount") //create an additional dataframe that has columns of user_feed table as we have to insert thes userIDS to user_feed table @@ -67,7 +69,7 @@ object NpsModel extends AbsDashboardModel { // write the dataframe to cassandra user_feed_backup table additionalDF.write .format("org.apache.spark.sql.cassandra") - .options(Map("keyspace" -> "sunbird_notifications" , "table" -> "notification_feed_backup")) + .options(Map("keyspace" -> "sunbird_notifications" , "table" -> "notification_feed_history")) .mode("append") .save() } diff --git a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/telemetry/SummaryRedisSyncModel.scala b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/telemetry/SummaryRedisSyncModel.scala index 6fd0ab08..251ec679 100644 --- a/batch-models/src/main/scala/org/ekstep/analytics/dashboard/telemetry/SummaryRedisSyncModel.scala +++ b/batch-models/src/main/scala/org/ekstep/analytics/dashboard/telemetry/SummaryRedisSyncModel.scala @@ -45,7 +45,7 @@ object SummaryRedisSyncModel extends AbsDashboardModel { // (SELECT COUNT(DISTINCT(actor_id)) AS daily_count, TIME_FLOOR(__time + INTERVAL '05:30' HOUR TO MINUTE, 'P1D') AS day_start FROM \"telemetry-events-syncts\" // WHERE eid='IMPRESSION' AND actor_type='User' AND __time > CURRENT_TIMESTAMP - INTERVAL '30' DAY GROUP BY 2) val averageMonthlyActiveUsersDF = averageMonthlyActiveUsersDataFrame() - val averageMonthlyActiveUsersCount = averageMonthlyActiveUsersDF.groupBy().agg(expr("CASE WHEN COUNT(*) > 0 THEN CAST(AVG(DAUOutput) AS LONG) ELSE 0 END").alias("count")).first().getLong(0) + val averageMonthlyActiveUsersCount = averageMonthlyActiveUsersDF.select("activeCount").first().getLong(0) Redis.update("lp_monthly_active_users", averageMonthlyActiveUsersCount.toString) println(s"lp_monthly_active_users = ${averageMonthlyActiveUsersCount}") @@ -88,11 +88,11 @@ object SummaryRedisSyncModel extends AbsDashboardModel { } def averageMonthlyActiveUsersDataFrame()(implicit spark: SparkSession, conf: DashboardConfig) : DataFrame = { - val query = """SELECT ROUND(AVG(daily_count * 1.0), 1) as DAUOutput FROM (SELECT COUNT(DISTINCT(actor_id)) AS daily_count, TIME_FLOOR(__time + INTERVAL '05:30' HOUR TO MINUTE, 'P1D') AS day_start FROM \"telemetry-events-syncts\" WHERE eid='IMPRESSION' AND actor_type='User' AND __time > CURRENT_TIMESTAMP - INTERVAL '30' DAY GROUP BY 2)""" + val query = """SELECT COUNT(DISTINCT(uid)) as activeCount FROM \"summary-events\" WHERE dimensions_type='app' AND __time > CURRENT_TIMESTAMP - INTERVAL '30' DAY""" var df = druidDFOption(query, conf.sparkDruidRouterHost).orNull if (df == null) return emptySchemaDataFrame(Schema.monthlyActiveUsersSchema) - df = df.withColumn("DAUOutput", expr("CAST(DAUOutput as LONG)")) // Important to cast as long otherwise a cast will fail later on + df = df.withColumn("activeCount", expr("CAST(activeCount as LONG)")) // Important to cast as long otherwise a cast will fail later on show(df) df