Skip to content

Commit

Permalink
Change play to duration to accomodate rebuffering
Browse files Browse the repository at this point in the history
  • Loading branch information
davidvanleeuwen committed Oct 12, 2023
1 parent 62a016c commit 3137183
Show file tree
Hide file tree
Showing 8 changed files with 360 additions and 171 deletions.
195 changes: 121 additions & 74 deletions lib/mave_metrics/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,103 +17,150 @@ defmodule MaveMetrics.API do

@ttl :timer.seconds(30)

alias MaveMetrics.Session.Play

@decorate cacheable(cache: Cache, key: {Play, "plays" <> key(query) <> key(interval) <> key(timeframe) <> key(minimum_watch_seconds)}, opts: [ttl: @ttl])
alias MaveMetrics.Session.Duration

@decorate cacheable(
cache: Cache,
key:
{Duration,
"plays" <>
key(query) <> key(interval) <> key(timeframe) <> key(minimum_watch_seconds)},
opts: [ttl: @ttl]
)
def get_plays(query, interval, timeframe, minimum_watch_seconds) do
interval = interval || @default_interval
timeframe = timeframe || @default_timeframe
minimum_watch_seconds = minimum_watch_seconds || @default_minimum_watch_seconds

result = Play
|> join(:left, [p], s in assoc(p, :session))
|> join(:left, [p, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> group_by([p, s, v], [fragment(~s|time_bucket('?', ?)|, literal(^interval), p.timestamp), p.session_id, s.platform, s.device_type, s.browser_type])
|> select_details(interval)
|> subquery()
|> where([e], e.elapsed_time >= ^minimum_watch_seconds)
|> group_by([e], e.interval)
|> format_output()
|> Repo.all()
result =
Duration
|> where([d], d.type == :play)
|> join(:left, [d], s in assoc(d, :session))
|> join(:left, [d, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> group_by([d, s, v], [
fragment(~s|time_bucket('?', ?)|, literal(^interval), d.timestamp),
d.session_id,
s.platform,
s.device_type,
s.browser_type
])
|> select_details(interval)
|> subquery()
|> where([e], e.elapsed_time >= ^minimum_watch_seconds)
|> group_by([e], e.interval)
|> format_output()
|> Repo.all()

result
end

@decorate cacheable(cache: Cache, key: {Play, "engagement" <> key(query) <> key(timeframe) <> key(ranges)}, opts: [ttl: @ttl])
@decorate cacheable(
cache: Cache,
key: {Duration, "engagement" <> key(query) <> key(timeframe) <> key(ranges)},
opts: [ttl: @ttl]
)
def get_engagement(query, timeframe, ranges) do
timeframe = timeframe || @default_timeframe
ranges = ranges || @default_ranges

last_point =
Play
|> join(:left, [p], s in assoc(p, :session))
|> join(:left, [p, s], v in assoc(s, :video))
Duration
|> where([d], d.type == :play)
|> join(:left, [d], s in assoc(d, :session))
|> join(:left, [d, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> select([p, s, v], max(p.to))
|> select([d, s, v], max(d.to))
|> Repo.one()

if is_nil(last_point) do
[]
else
part = last_point / ranges

{:ok, result} = Repo.transaction(fn ->
0 .. ranges - 1 |> Enum.map(fn m ->
from_moment = part * m
to_moment = part * (m + 1)

result = Play
|> join(:left, [p], s in assoc(p, :session))
|> join(:left, [p, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> select([p, s, v], %{from: p.from, to: p.to})
|> subquery()
|> where([e], e.from >= ^from_moment and e.from <= ^from_moment) # starts within range
|> or_where([e], e.from < ^from_moment and e.to > ^to_moment) # starts before range and ends after range
|> or_where([e], e.from < ^from_moment and e.to >= ^from_moment and e.to <= ^to_moment) # starts before range and ends within range
|> Repo.all()

%{range: m, viewers: result |> Enum.count, range_time: %{from: from_moment, to: to_moment}}
end) |> Enum.sort(&(&1.range < &2.range))
end)
{:ok, result} =
Repo.transaction(fn ->
0..(ranges - 1)
|> Enum.map(fn m ->
from_moment = part * m
to_moment = part * (m + 1)

result =
Duration
|> where([d], d.type == :play)
|> join(:left, [d], s in assoc(d, :session))
|> join(:left, [d, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> select([d, s, v], %{from: d.from, to: d.to})
|> subquery()
# starts within range
|> where([e], e.from >= ^from_moment and e.from <= ^from_moment)
# starts before range and ends after range
|> or_where([e], e.from < ^from_moment and e.to > ^to_moment)
# starts before range and ends within range
|> or_where(
[e],
e.from < ^from_moment and e.to >= ^from_moment and e.to <= ^to_moment
)
|> Repo.all()

%{
range: m,
viewers: result |> Enum.count(),
range_time: %{from: from_moment, to: to_moment}
}
end)
|> Enum.sort(&(&1.range < &2.range))
end)

result
end
end

@decorate cacheable(cache: Cache, key: {Play, "source" <> key(query) <> key(interval) <> key(timeframe) <> key(minimum_watch_seconds)}, opts: [ttl: @ttl])
@decorate cacheable(
cache: Cache,
key:
{Duration,
"source" <>
key(query) <> key(interval) <> key(timeframe) <> key(minimum_watch_seconds)},
opts: [ttl: @ttl]
)
def get_sources(query, interval, timeframe, minimum_watch_seconds) do
interval = interval || @default_interval
timeframe = timeframe || @default_timeframe
minimum_watch_seconds = minimum_watch_seconds || @default_minimum_watch_seconds

result = Play
|> join(:left, [p], s in assoc(p, :session))
|> join(:left, [p, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> group_by([p, s, v], [fragment(~s|time_bucket('?', ?)|, literal(^interval), p.timestamp), p.session_id, v.source_uri])
|> select([p, s, v], %{
host: fragment(~s|(? ->> ?)|, v.source_uri, "host"),
path: fragment(~s|(? ->> ?)|, v.source_uri, "path"),
interval: fragment(~s|time_bucket('?', ?)|, literal(^interval), p.timestamp),
elapsed_time: sum(p.elapsed_time)
})
|> subquery()
|> where([e], e.elapsed_time >= ^minimum_watch_seconds)
|> having([e], sum(e.elapsed_time) >= ^minimum_watch_seconds)
|> group_by([e], [e.host, e.path, e.interval])
|> select([e], %{
interval: e.interval,
host: e.host,
path: e.path,
views: count(e.interval)
})
|> Repo.all()
result =
Duration
|> join(:left, [d], s in assoc(d, :session))
|> join(:left, [d, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> group_by([d, s, v], [
fragment(~s|time_bucket('?', ?)|, literal(^interval), d.timestamp),
d.session_id,
v.source_uri
])
|> select([d, s, v], %{
host: fragment(~s|(? ->> ?)|, v.source_uri, "host"),
path: fragment(~s|(? ->> ?)|, v.source_uri, "path"),
interval: fragment(~s|time_bucket('?', ?)|, literal(^interval), d.timestamp),
elapsed_time: sum(d.elapsed_time)
})
|> subquery()
|> where([e], e.elapsed_time >= ^minimum_watch_seconds)
|> having([e], sum(e.elapsed_time) >= ^minimum_watch_seconds)
|> group_by([e], [e.host, e.path, e.interval])
|> select([e], %{
interval: e.interval,
host: e.host,
path: e.path,
views: count(e.interval)
})
|> Repo.all()

result
end
Expand All @@ -129,22 +176,22 @@ defmodule MaveMetrics.API do
end

defp where_query(q, %{"video" => video_query} = _query) do
q |> where([p, s, v], fragment("? @> ?", v.metadata, ^video_query))
q |> where([d, s, v], fragment("? @> ?", v.metadata, ^video_query))
end

defp where_query(q, %{"session" => session_query} = _query) do
q |> where([p, s, v], fragment("? @> ?", s.metadata, ^session_query))
q |> where([d, s, v], fragment("? @> ?", s.metadata, ^session_query))
end

defp where_query(q, query) do
q |> where([p, s, v], v.identifier == ^query)
q |> where([d, s, v], v.identifier == ^query)
end

defp select_details(query, interval) do
query
|> select([p, s, v], %{
interval: fragment("time_bucket('?', ?)", literal(^interval), p.timestamp),
elapsed_time: sum(p.elapsed_time),
|> select([d, s, v], %{
interval: fragment("time_bucket('?', ?)", literal(^interval), d.timestamp),
elapsed_time: sum(d.elapsed_time),
platform_mac: case_when(s.platform == :mac, 1, 0),
platform_ios: case_when(s.platform == :ios, 1, 0),
platform_android: case_when(s.platform == :android, 1, 0),
Expand Down Expand Up @@ -204,20 +251,20 @@ defmodule MaveMetrics.API do
{:ok, to} = DateTime.from_unix(to_timestamp)

query
|> where([p, s, v], p.timestamp >= ^from)
|> where([p, s, v], p.timestamp <= ^to)
|> where([d, s, v], d.timestamp >= ^from)
|> where([d, s, v], d.timestamp <= ^to)
end

defp where_timeframe(query, timeframe) when is_number(timeframe) do
{:ok, from} = DateTime.from_unix(timeframe)

query
|> where([p, s, v], p.timestamp >= ^from)
|> where([d, s, v], d.timestamp >= ^from)
end

defp where_timeframe(query, timeframe) do
query
|> where([p, s, v], p.timestamp >= fragment("now() - interval '?'", literal(^timeframe)))
|> where([d, s, v], d.timestamp >= fragment("now() - interval '?'", literal(^timeframe)))
end

defp key(%{} = query) do
Expand Down
3 changes: 2 additions & 1 deletion lib/mave_metrics/keys.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule MaveMetrics.Keys do

def revoke_key(%{"key" => key}) do
with %Key{} = key <- Key |> where([k], k.key == ^key) |> Repo.one(),
{:ok, _} <- key |> Key.changeset(%{disabled_at: DateTime.utc_now()}) |> Repo.update() do
{:ok, _} <- key |> Key.changeset(%{disabled_at: DateTime.utc_now()}) |> Repo.update() do
:ok
else
_ -> :error
Expand All @@ -51,6 +51,7 @@ defmodule MaveMetrics.Keys do
|> case do
{:ok, result} ->
{:ok, result.key}

{:error, reason} ->
{:error, reason}
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
defmodule MaveMetrics.Session.Play do
defmodule MaveMetrics.Session.Duration do
use Ecto.Schema
import Ecto.Changeset

@primary_key false
@foreign_key_type :binary_id

@required_fields ~w(timestamp from)a
@optional_fields ~w(session_id to elapsed_time)a
@optional_fields ~w(session_id to elapsed_time type)a

schema "plays" do
schema "durations" do
field :timestamp, :utc_datetime_usec, primary_key: true
belongs_to :session, MaveMetrics.Session, primary_key: true

field :type, Ecto.Enum, values: [:play, :rebuffering, :fullscreen]

field :from, :float
field :to, :float

Expand Down
12 changes: 9 additions & 3 deletions lib/mave_metrics/session/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@ defmodule MaveMetrics.Session do
@optional_fields ~w(browser_type platform device_type metadata)a

schema "sessions" do
field :browser_type, Ecto.Enum, values: [:edge, :ie, :chrome, :firefox, :opera, :safari, :brave, :other], default: :other
field :platform, Ecto.Enum, values: [:ios, :android, :mac, :windows, :linux, :other], default: :other
field :browser_type, Ecto.Enum,
values: [:edge, :ie, :chrome, :firefox, :opera, :safari, :brave, :other],
default: :other

field :platform, Ecto.Enum,
values: [:ios, :android, :mac, :windows, :linux, :other],
default: :other

field :device_type, Ecto.Enum, values: [:mobile, :tablet, :desktop, :other], default: :other

belongs_to :video, MaveMetrics.Video
belongs_to :key, MaveMetrics.Key

has_many :events, MaveMetrics.Session.Event
has_many :sources, MaveMetrics.Session.Source
has_many :plays, MaveMetrics.Session.Play
has_many :durations, MaveMetrics.Session.Duration

field :metadata, :map

Expand Down
Loading

0 comments on commit 3137183

Please sign in to comment.