diff --git a/server/lib/mave_metrics/api.ex b/server/lib/mave_metrics/api.ex index d1ddeff..8354df9 100644 --- a/server/lib/mave_metrics/api.ex +++ b/server/lib/mave_metrics/api.ex @@ -48,6 +48,23 @@ defmodule MaveMetrics.API do end end + def get_engagement(%{"video" => query}, interval, timeframe) do + interval = interval || @default_interval + timeframe = timeframe || @default_timeframe + + video_id = + Video + |> where([v], fragment("? @> ?", v.metadata, ^query)) + |> select([v], v.id) + |> Repo.one() + + if video_id == nil do + [] + else + query_individual_video_engagement(video_id, timeframe, interval) + end + end + def query_aggregated_video_metrics(video_ids, timeframe, min_watched_seconds, interval) do "daily_session_aggregation" |> apply_timeframe(timeframe) @@ -136,23 +153,69 @@ defmodule MaveMetrics.API do |> Repo.all() end - defp apply_timeframe(query, %{"from" => from_timestamp, "to" => to_timestamp}) do + def query_individual_video_engagement(video_id, timeframe, interval) do + result = + "video_views_per_second_per_day_aggregate" + |> where([d], d.video_id == ^video_id) + |> apply_timeframe(timeframe, :event_date) + |> group_by([d], [ + fragment(~s|time_bucket('?', ?)|, literal(^interval), d.event_date), + d.video_second + ]) + |> select([d], %{ + interval: fragment(~s|time_bucket('?', ?)|, literal(^interval), d.event_date), + second: d.video_second, + views: type(sum(d.views), :integer) + }) + |> order_by([d], + asc: fragment(~s|time_bucket('?', ?)|, literal(^interval), d.event_date), + asc: d.video_second + ) + |> Repo.all() + + nest_engagement_data_by_interval(result) + end + + defp nest_engagement_data_by_interval(results) do + results + |> Enum.group_by(& &1.interval) + |> Enum.map(fn {interval, engagements} -> + %{ + interval: interval, + per_second: + Enum.map(engagements, fn %{second: second, views: views} -> + %{second: second, views: views} + end) + } + end) + end + + defp apply_timeframe(query, timeframe, date_field \\ :session_date) + + defp apply_timeframe( + query, + %{"from" => from_timestamp, "to" => to_timestamp}, + date_field + ) do from = DateTime.from_unix!(from_timestamp) |> Timex.to_date() to = DateTime.from_unix!(to_timestamp) |> Timex.to_date() query - |> where([d], d.session_date >= ^from and d.session_date <= ^to) + |> where([d], field(d, ^date_field) >= ^from and field(d, ^date_field) <= ^to) end - defp apply_timeframe(query, timeframe) when is_number(timeframe) do + defp apply_timeframe(query, timeframe, date_field) when is_number(timeframe) do from = DateTime.from_unix!(timeframe) |> Timex.to_date() query - |> where([d], d.session_date >= ^from) + |> where([d], field(d, ^date_field) >= ^from) end - defp apply_timeframe(query, timeframe) when is_binary(timeframe) do + defp apply_timeframe(query, timeframe, date_field) when is_binary(timeframe) do query - |> where([d], d.session_date >= fragment(~s|now() - interval '?'|, literal(^timeframe))) + |> where( + [d], + field(d, ^date_field) >= fragment(~s|now() - interval '?'|, literal(^timeframe)) + ) end end diff --git a/server/lib/mave_metrics/stats.ex b/server/lib/mave_metrics/stats.ex index 9aef4a4..504c75c 100644 --- a/server/lib/mave_metrics/stats.ex +++ b/server/lib/mave_metrics/stats.ex @@ -112,6 +112,7 @@ defmodule MaveMetrics.Stats do def refresh_daily_aggregation() do Repo.query!("REFRESH MATERIALIZED VIEW daily_session_aggregation;") + Repo.query!("REFRESH MATERIALIZED VIEW video_views_per_second_aggregate;") end defp float_video_time(attrs) do diff --git a/server/lib/mave_metrics_web/controllers/api/engagement_controller.ex b/server/lib/mave_metrics_web/controllers/api/engagement_controller.ex new file mode 100644 index 0000000..6a0d033 --- /dev/null +++ b/server/lib/mave_metrics_web/controllers/api/engagement_controller.ex @@ -0,0 +1,26 @@ +defmodule MaveMetricsWeb.API.EngagementController do + use MaveMetricsWeb, :controller + + alias MaveMetrics.API + + def get_engagement(conn, _params) do + {:ok, body, _conn} = Plug.Conn.read_body(conn) + params = Jason.decode!(body) + conn |> engagement(params) + end + + def engagement(conn, %{"query" => query} = params) do + result = API.get_engagement(query, params["interval"], params["timeframe"]) + + conn + |> json(%{engagement: result}) + |> halt + end + + def engagement(conn, _params) do + conn + |> put_status(400) + |> json(%{error: "Requires a valid JSON query struct."}) + |> halt + end +end diff --git a/server/lib/mave_metrics_web/controllers/api/sources_controller.ex b/server/lib/mave_metrics_web/controllers/api/sources_controller.ex index 971a06d..9ccc7b3 100644 --- a/server/lib/mave_metrics_web/controllers/api/sources_controller.ex +++ b/server/lib/mave_metrics_web/controllers/api/sources_controller.ex @@ -9,24 +9,14 @@ defmodule MaveMetricsWeb.API.SourcesController do conn |> sources(params) end - def sources(conn, %{"identifier" => identifier, "query" => query} = params) do - result = API.get_sources({identifier, query}, params["interval"], params["timeframe"], params["minimum_watch_seconds"]) - - conn - |> json(%{sources: result}) - |> halt - end - - def sources(conn, %{"identifier" => identifier} = params) do - result = API.get_sources(identifier, params["interval"], params["timeframe"], params["minimum_watch_seconds"]) - - conn - |> json(%{sources: result}) - |> halt - end - def sources(conn, %{"query" => filters} = params) do - result = API.get_sources(filters, params["interval"], params["timeframe"], params["minimum_watch_seconds"]) + result = + API.get_sources( + filters, + params["interval"], + params["timeframe"], + params["minimum_watch_seconds"] + ) conn |> json(%{sources: result}) diff --git a/server/lib/mave_metrics_web/controllers/api/views_controller.ex b/server/lib/mave_metrics_web/controllers/api/views_controller.ex index 5488d0a..22fe21d 100644 --- a/server/lib/mave_metrics_web/controllers/api/views_controller.ex +++ b/server/lib/mave_metrics_web/controllers/api/views_controller.ex @@ -9,24 +9,14 @@ defmodule MaveMetricsWeb.API.ViewsController do conn |> views(params) end - def views(conn, %{"identifier" => identifier, "query" => query} = params) do - result = API.get_plays({identifier, query}, params["interval"], params["timeframe"], params["minimum_watch_seconds"]) - - conn - |> json(%{views: result}) - |> halt - end - - def views(conn, %{"identifier" => identifier} = params) do - result = API.get_plays(identifier, params["interval"], params["timeframe"], params["minimum_watch_seconds"]) - - conn - |> json(%{views: result}) - |> halt - end - def views(conn, %{"query" => filters} = params) do - result = API.get_plays(filters, params["interval"], params["timeframe"], params["minimum_watch_seconds"]) + result = + API.get_plays( + filters, + params["interval"], + params["timeframe"], + params["minimum_watch_seconds"] + ) conn |> json(%{views: result}) diff --git a/server/lib/mave_metrics_web/router.ex b/server/lib/mave_metrics_web/router.ex index d50faf6..5209fc4 100644 --- a/server/lib/mave_metrics_web/router.ex +++ b/server/lib/mave_metrics_web/router.ex @@ -24,6 +24,8 @@ defmodule MaveMetricsWeb.Router do get("/views", API.ViewsController, :get_views) post("/sources", API.SourcesController, :sources) get("/sources", API.SourcesController, :get_sources) + post("/engagement", API.EngagementController, :engagement) + get("/engagement", API.EngagementController, :get_engagement) post("/keys", API.KeysController, :create_key) get("/keys", API.KeysController, :get_keys) diff --git a/server/priv/repo/migrations/20240226221750_add_engagement.exs b/server/priv/repo/migrations/20240226221750_add_engagement.exs new file mode 100644 index 0000000..56b45cb --- /dev/null +++ b/server/priv/repo/migrations/20240226221750_add_engagement.exs @@ -0,0 +1,32 @@ +defmodule MaveMetrics.Repo.Migrations.AddEngagement do + use Ecto.Migration + + def change do + execute(""" + CREATE MATERIALIZED VIEW IF NOT EXISTS video_views_per_second_per_day_aggregate AS + SELECT + e.video_id, + DATE(e.timestamp) AS event_date, + gs.second AS video_second, + COUNT(*) AS views + FROM + events e + JOIN + events e_next ON e.session_id = e_next.session_id + AND e_next.type = 'pause' + AND e_next.timestamp > e.timestamp + AND e.type = 'play', + LATERAL + generate_series( + floor(e.video_time)::int, + ceil(e_next.video_time)::int - 1 + ) AS gs(second) + GROUP BY + e.video_id, DATE(e.timestamp), gs.second; + """) + + execute(""" + CREATE INDEX ON video_views_per_second_per_day_aggregate (video_id, event_date, video_second); + """) + end +end