Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get engagement for a video #9

Merged
merged 1 commit into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 69 additions & 6 deletions server/lib/mave_metrics/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,23 @@ defmodule MaveMetrics.API do
end
end

def get_engagement(%{"video" => query}, interval, timeframe) do
interval = interval || @default_interval
timeframe = timeframe || @default_timeframe

video_id =
Video
|> where([v], fragment("? @> ?", v.metadata, ^query))
|> select([v], v.id)
|> Repo.one()

if video_id == nil do
[]
else
query_individual_video_engagement(video_id, timeframe, interval)
end
end

def query_aggregated_video_metrics(video_ids, timeframe, min_watched_seconds, interval) do
"daily_session_aggregation"
|> apply_timeframe(timeframe)
Expand Down Expand Up @@ -136,23 +153,69 @@ defmodule MaveMetrics.API do
|> Repo.all()
end

defp apply_timeframe(query, %{"from" => from_timestamp, "to" => to_timestamp}) do
def query_individual_video_engagement(video_id, timeframe, interval) do
result =
"video_views_per_second_per_day_aggregate"
|> where([d], d.video_id == ^video_id)
|> apply_timeframe(timeframe, :event_date)
|> group_by([d], [
fragment(~s|time_bucket('?', ?)|, literal(^interval), d.event_date),
d.video_second
])
|> select([d], %{
interval: fragment(~s|time_bucket('?', ?)|, literal(^interval), d.event_date),
second: d.video_second,
views: type(sum(d.views), :integer)
})
|> order_by([d],
asc: fragment(~s|time_bucket('?', ?)|, literal(^interval), d.event_date),
asc: d.video_second
)
|> Repo.all()

nest_engagement_data_by_interval(result)
end

defp nest_engagement_data_by_interval(results) do
results
|> Enum.group_by(& &1.interval)
|> Enum.map(fn {interval, engagements} ->
%{
interval: interval,
per_second:
Enum.map(engagements, fn %{second: second, views: views} ->
%{second: second, views: views}
end)
}
end)
end

defp apply_timeframe(query, timeframe, date_field \\ :session_date)

defp apply_timeframe(
query,
%{"from" => from_timestamp, "to" => to_timestamp},
date_field
) do
from = DateTime.from_unix!(from_timestamp) |> Timex.to_date()
to = DateTime.from_unix!(to_timestamp) |> Timex.to_date()

query
|> where([d], d.session_date >= ^from and d.session_date <= ^to)
|> where([d], field(d, ^date_field) >= ^from and field(d, ^date_field) <= ^to)
end

defp apply_timeframe(query, timeframe) when is_number(timeframe) do
defp apply_timeframe(query, timeframe, date_field) when is_number(timeframe) do
from = DateTime.from_unix!(timeframe) |> Timex.to_date()

query
|> where([d], d.session_date >= ^from)
|> where([d], field(d, ^date_field) >= ^from)
end

defp apply_timeframe(query, timeframe) when is_binary(timeframe) do
defp apply_timeframe(query, timeframe, date_field) when is_binary(timeframe) do
query
|> where([d], d.session_date >= fragment(~s|now() - interval '?'|, literal(^timeframe)))
|> where(
[d],
field(d, ^date_field) >= fragment(~s|now() - interval '?'|, literal(^timeframe))
)
end
end
1 change: 1 addition & 0 deletions server/lib/mave_metrics/stats.ex
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ defmodule MaveMetrics.Stats do

def refresh_daily_aggregation() do
Repo.query!("REFRESH MATERIALIZED VIEW daily_session_aggregation;")
Repo.query!("REFRESH MATERIALIZED VIEW video_views_per_second_aggregate;")
end

defp float_video_time(attrs) do
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule MaveMetricsWeb.API.EngagementController do
use MaveMetricsWeb, :controller

alias MaveMetrics.API

def get_engagement(conn, _params) do
{:ok, body, _conn} = Plug.Conn.read_body(conn)
params = Jason.decode!(body)
conn |> engagement(params)
end

def engagement(conn, %{"query" => query} = params) do
result = API.get_engagement(query, params["interval"], params["timeframe"])

conn
|> json(%{engagement: result})
|> halt
end

def engagement(conn, _params) do
conn
|> put_status(400)
|> json(%{error: "Requires a valid JSON query struct."})
|> halt
end
end
24 changes: 7 additions & 17 deletions server/lib/mave_metrics_web/controllers/api/sources_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,14 @@ defmodule MaveMetricsWeb.API.SourcesController do
conn |> sources(params)
end

def sources(conn, %{"identifier" => identifier, "query" => query} = params) do
result = API.get_sources({identifier, query}, params["interval"], params["timeframe"], params["minimum_watch_seconds"])

conn
|> json(%{sources: result})
|> halt
end

def sources(conn, %{"identifier" => identifier} = params) do
result = API.get_sources(identifier, params["interval"], params["timeframe"], params["minimum_watch_seconds"])

conn
|> json(%{sources: result})
|> halt
end

def sources(conn, %{"query" => filters} = params) do
result = API.get_sources(filters, params["interval"], params["timeframe"], params["minimum_watch_seconds"])
result =
API.get_sources(
filters,
params["interval"],
params["timeframe"],
params["minimum_watch_seconds"]
)

conn
|> json(%{sources: result})
Expand Down
24 changes: 7 additions & 17 deletions server/lib/mave_metrics_web/controllers/api/views_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,14 @@ defmodule MaveMetricsWeb.API.ViewsController do
conn |> views(params)
end

def views(conn, %{"identifier" => identifier, "query" => query} = params) do
result = API.get_plays({identifier, query}, params["interval"], params["timeframe"], params["minimum_watch_seconds"])

conn
|> json(%{views: result})
|> halt
end

def views(conn, %{"identifier" => identifier} = params) do
result = API.get_plays(identifier, params["interval"], params["timeframe"], params["minimum_watch_seconds"])

conn
|> json(%{views: result})
|> halt
end

def views(conn, %{"query" => filters} = params) do
result = API.get_plays(filters, params["interval"], params["timeframe"], params["minimum_watch_seconds"])
result =
API.get_plays(
filters,
params["interval"],
params["timeframe"],
params["minimum_watch_seconds"]
)

conn
|> json(%{views: result})
Expand Down
2 changes: 2 additions & 0 deletions server/lib/mave_metrics_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ defmodule MaveMetricsWeb.Router do
get("/views", API.ViewsController, :get_views)
post("/sources", API.SourcesController, :sources)
get("/sources", API.SourcesController, :get_sources)
post("/engagement", API.EngagementController, :engagement)
get("/engagement", API.EngagementController, :get_engagement)

post("/keys", API.KeysController, :create_key)
get("/keys", API.KeysController, :get_keys)
Expand Down
32 changes: 32 additions & 0 deletions server/priv/repo/migrations/20240226221750_add_engagement.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule MaveMetrics.Repo.Migrations.AddEngagement do
use Ecto.Migration

def change do
execute("""
CREATE MATERIALIZED VIEW IF NOT EXISTS video_views_per_second_per_day_aggregate AS
SELECT
e.video_id,
DATE(e.timestamp) AS event_date,
gs.second AS video_second,
COUNT(*) AS views
FROM
events e
JOIN
events e_next ON e.session_id = e_next.session_id
AND e_next.type = 'pause'
AND e_next.timestamp > e.timestamp
AND e.type = 'play',
LATERAL
generate_series(
floor(e.video_time)::int,
ceil(e_next.video_time)::int - 1
) AS gs(second)
GROUP BY
e.video_id, DATE(e.timestamp), gs.second;
""")

execute("""
CREATE INDEX ON video_views_per_second_per_day_aggregate (video_id, event_date, video_second);
""")
end
end
Loading