Skip to content

Commit

Permalink
Get engagement for a video
Browse files Browse the repository at this point in the history
  • Loading branch information
davidvanleeuwen authored and wietsehage committed Feb 27, 2024
1 parent 67174c5 commit f03a7d3
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 40 deletions.
75 changes: 69 additions & 6 deletions server/lib/mave_metrics/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,23 @@ defmodule MaveMetrics.API do
end
end

def get_engagement(%{"video" => query}, interval, timeframe) do
interval = interval || @default_interval
timeframe = timeframe || @default_timeframe

video_id =
Video
|> where([v], fragment("? @> ?", v.metadata, ^query))
|> select([v], v.id)
|> Repo.one()

if video_id == nil do
[]
else
query_individual_video_engagement(video_id, timeframe, interval)
end
end

def query_aggregated_video_metrics(video_ids, timeframe, min_watched_seconds, interval) do
"daily_session_aggregation"
|> apply_timeframe(timeframe)
Expand Down Expand Up @@ -136,23 +153,69 @@ defmodule MaveMetrics.API do
|> Repo.all()
end

defp apply_timeframe(query, %{"from" => from_timestamp, "to" => to_timestamp}) do
def query_individual_video_engagement(video_id, timeframe, interval) do
result =
"video_views_per_second_per_day_aggregate"
|> where([d], d.video_id == ^video_id)
|> apply_timeframe(timeframe, :event_date)
|> group_by([d], [
fragment(~s|time_bucket('?', ?)|, literal(^interval), d.event_date),
d.video_second
])
|> select([d], %{
interval: fragment(~s|time_bucket('?', ?)|, literal(^interval), d.event_date),
second: d.video_second,
views: type(sum(d.views), :integer)
})
|> order_by([d],
asc: fragment(~s|time_bucket('?', ?)|, literal(^interval), d.event_date),
asc: d.video_second
)
|> Repo.all()

nest_engagement_data_by_interval(result)
end

defp nest_engagement_data_by_interval(results) do
results
|> Enum.group_by(& &1.interval)
|> Enum.map(fn {interval, engagements} ->
%{
interval: interval,
per_second:
Enum.map(engagements, fn %{second: second, views: views} ->
%{second: second, views: views}
end)
}
end)
end

defp apply_timeframe(query, timeframe, date_field \\ :session_date)

defp apply_timeframe(
query,
%{"from" => from_timestamp, "to" => to_timestamp},
date_field
) do
from = DateTime.from_unix!(from_timestamp) |> Timex.to_date()
to = DateTime.from_unix!(to_timestamp) |> Timex.to_date()

query
|> where([d], d.session_date >= ^from and d.session_date <= ^to)
|> where([d], field(d, ^date_field) >= ^from and field(d, ^date_field) <= ^to)
end

defp apply_timeframe(query, timeframe) when is_number(timeframe) do
defp apply_timeframe(query, timeframe, date_field) when is_number(timeframe) do
from = DateTime.from_unix!(timeframe) |> Timex.to_date()

query
|> where([d], d.session_date >= ^from)
|> where([d], field(d, ^date_field) >= ^from)
end

defp apply_timeframe(query, timeframe) when is_binary(timeframe) do
defp apply_timeframe(query, timeframe, date_field) when is_binary(timeframe) do
query
|> where([d], d.session_date >= fragment(~s|now() - interval '?'|, literal(^timeframe)))
|> where(
[d],
field(d, ^date_field) >= fragment(~s|now() - interval '?'|, literal(^timeframe))
)
end
end
1 change: 1 addition & 0 deletions server/lib/mave_metrics/stats.ex
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ defmodule MaveMetrics.Stats do

def refresh_daily_aggregation() do
Repo.query!("REFRESH MATERIALIZED VIEW daily_session_aggregation;")
Repo.query!("REFRESH MATERIALIZED VIEW video_views_per_second_aggregate;")
end

defp float_video_time(attrs) do
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule MaveMetricsWeb.API.EngagementController do
use MaveMetricsWeb, :controller

alias MaveMetrics.API

def get_engagement(conn, _params) do
{:ok, body, _conn} = Plug.Conn.read_body(conn)
params = Jason.decode!(body)
conn |> engagement(params)
end

def engagement(conn, %{"query" => query} = params) do
result = API.get_engagement(query, params["interval"], params["timeframe"])

conn
|> json(%{engagement: result})
|> halt
end

def engagement(conn, _params) do
conn
|> put_status(400)
|> json(%{error: "Requires a valid JSON query struct."})
|> halt
end
end
24 changes: 7 additions & 17 deletions server/lib/mave_metrics_web/controllers/api/sources_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,14 @@ defmodule MaveMetricsWeb.API.SourcesController do
conn |> sources(params)
end

def sources(conn, %{"identifier" => identifier, "query" => query} = params) do
result = API.get_sources({identifier, query}, params["interval"], params["timeframe"], params["minimum_watch_seconds"])

conn
|> json(%{sources: result})
|> halt
end

def sources(conn, %{"identifier" => identifier} = params) do
result = API.get_sources(identifier, params["interval"], params["timeframe"], params["minimum_watch_seconds"])

conn
|> json(%{sources: result})
|> halt
end

def sources(conn, %{"query" => filters} = params) do
result = API.get_sources(filters, params["interval"], params["timeframe"], params["minimum_watch_seconds"])
result =
API.get_sources(
filters,
params["interval"],
params["timeframe"],
params["minimum_watch_seconds"]
)

conn
|> json(%{sources: result})
Expand Down
24 changes: 7 additions & 17 deletions server/lib/mave_metrics_web/controllers/api/views_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,14 @@ defmodule MaveMetricsWeb.API.ViewsController do
conn |> views(params)
end

def views(conn, %{"identifier" => identifier, "query" => query} = params) do
result = API.get_plays({identifier, query}, params["interval"], params["timeframe"], params["minimum_watch_seconds"])

conn
|> json(%{views: result})
|> halt
end

def views(conn, %{"identifier" => identifier} = params) do
result = API.get_plays(identifier, params["interval"], params["timeframe"], params["minimum_watch_seconds"])

conn
|> json(%{views: result})
|> halt
end

def views(conn, %{"query" => filters} = params) do
result = API.get_plays(filters, params["interval"], params["timeframe"], params["minimum_watch_seconds"])
result =
API.get_plays(
filters,
params["interval"],
params["timeframe"],
params["minimum_watch_seconds"]
)

conn
|> json(%{views: result})
Expand Down
2 changes: 2 additions & 0 deletions server/lib/mave_metrics_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ defmodule MaveMetricsWeb.Router do
get("/views", API.ViewsController, :get_views)
post("/sources", API.SourcesController, :sources)
get("/sources", API.SourcesController, :get_sources)
post("/engagement", API.EngagementController, :engagement)
get("/engagement", API.EngagementController, :get_engagement)

post("/keys", API.KeysController, :create_key)
get("/keys", API.KeysController, :get_keys)
Expand Down
32 changes: 32 additions & 0 deletions server/priv/repo/migrations/20240226221750_add_engagement.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule MaveMetrics.Repo.Migrations.AddEngagement do
use Ecto.Migration

def change do
execute("""
CREATE MATERIALIZED VIEW IF NOT EXISTS video_views_per_second_per_day_aggregate AS
SELECT
e.video_id,
DATE(e.timestamp) AS event_date,
gs.second AS video_second,
COUNT(*) AS views
FROM
events e
JOIN
events e_next ON e.session_id = e_next.session_id
AND e_next.type = 'pause'
AND e_next.timestamp > e.timestamp
AND e.type = 'play',
LATERAL
generate_series(
floor(e.video_time)::int,
ceil(e_next.video_time)::int - 1
) AS gs(second)
GROUP BY
e.video_id, DATE(e.timestamp), gs.second;
""")

execute("""
CREATE INDEX ON video_views_per_second_per_day_aggregate (video_id, event_date, video_second);
""")
end
end

0 comments on commit f03a7d3

Please sign in to comment.