Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change play to duration to accomodate rebuffering #3

Merged
merged 1 commit into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 121 additions & 74 deletions lib/mave_metrics/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,103 +17,150 @@ defmodule MaveMetrics.API do

@ttl :timer.seconds(30)

alias MaveMetrics.Session.Play

@decorate cacheable(cache: Cache, key: {Play, "plays" <> key(query) <> key(interval) <> key(timeframe) <> key(minimum_watch_seconds)}, opts: [ttl: @ttl])
alias MaveMetrics.Session.Duration

@decorate cacheable(
cache: Cache,
key:
{Duration,
"plays" <>
key(query) <> key(interval) <> key(timeframe) <> key(minimum_watch_seconds)},
opts: [ttl: @ttl]
)
def get_plays(query, interval, timeframe, minimum_watch_seconds) do
interval = interval || @default_interval
timeframe = timeframe || @default_timeframe
minimum_watch_seconds = minimum_watch_seconds || @default_minimum_watch_seconds

result = Play
|> join(:left, [p], s in assoc(p, :session))
|> join(:left, [p, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> group_by([p, s, v], [fragment(~s|time_bucket('?', ?)|, literal(^interval), p.timestamp), p.session_id, s.platform, s.device_type, s.browser_type])
|> select_details(interval)
|> subquery()
|> where([e], e.elapsed_time >= ^minimum_watch_seconds)
|> group_by([e], e.interval)
|> format_output()
|> Repo.all()
result =
Duration
|> where([d], d.type == :play)
|> join(:left, [d], s in assoc(d, :session))
|> join(:left, [d, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> group_by([d, s, v], [
fragment(~s|time_bucket('?', ?)|, literal(^interval), d.timestamp),
d.session_id,
s.platform,
s.device_type,
s.browser_type
])
|> select_details(interval)
|> subquery()
|> where([e], e.elapsed_time >= ^minimum_watch_seconds)
|> group_by([e], e.interval)
|> format_output()
|> Repo.all()

result
end

@decorate cacheable(cache: Cache, key: {Play, "engagement" <> key(query) <> key(timeframe) <> key(ranges)}, opts: [ttl: @ttl])
@decorate cacheable(
cache: Cache,
key: {Duration, "engagement" <> key(query) <> key(timeframe) <> key(ranges)},
opts: [ttl: @ttl]
)
def get_engagement(query, timeframe, ranges) do
timeframe = timeframe || @default_timeframe
ranges = ranges || @default_ranges

last_point =
Play
|> join(:left, [p], s in assoc(p, :session))
|> join(:left, [p, s], v in assoc(s, :video))
Duration
|> where([d], d.type == :play)
|> join(:left, [d], s in assoc(d, :session))
|> join(:left, [d, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> select([p, s, v], max(p.to))
|> select([d, s, v], max(d.to))
|> Repo.one()

if is_nil(last_point) do
[]
else
part = last_point / ranges

{:ok, result} = Repo.transaction(fn ->
0 .. ranges - 1 |> Enum.map(fn m ->
from_moment = part * m
to_moment = part * (m + 1)

result = Play
|> join(:left, [p], s in assoc(p, :session))
|> join(:left, [p, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> select([p, s, v], %{from: p.from, to: p.to})
|> subquery()
|> where([e], e.from >= ^from_moment and e.from <= ^from_moment) # starts within range
|> or_where([e], e.from < ^from_moment and e.to > ^to_moment) # starts before range and ends after range
|> or_where([e], e.from < ^from_moment and e.to >= ^from_moment and e.to <= ^to_moment) # starts before range and ends within range
|> Repo.all()

%{range: m, viewers: result |> Enum.count, range_time: %{from: from_moment, to: to_moment}}
end) |> Enum.sort(&(&1.range < &2.range))
end)
{:ok, result} =
Repo.transaction(fn ->
0..(ranges - 1)
|> Enum.map(fn m ->
from_moment = part * m
to_moment = part * (m + 1)

result =
Duration
|> where([d], d.type == :play)
|> join(:left, [d], s in assoc(d, :session))
|> join(:left, [d, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> select([d, s, v], %{from: d.from, to: d.to})
|> subquery()
# starts within range
|> where([e], e.from >= ^from_moment and e.from <= ^from_moment)
# starts before range and ends after range
|> or_where([e], e.from < ^from_moment and e.to > ^to_moment)
# starts before range and ends within range
|> or_where(
[e],
e.from < ^from_moment and e.to >= ^from_moment and e.to <= ^to_moment
)
|> Repo.all()

%{
range: m,
viewers: result |> Enum.count(),
range_time: %{from: from_moment, to: to_moment}
}
end)
|> Enum.sort(&(&1.range < &2.range))
end)

result
end
end

@decorate cacheable(cache: Cache, key: {Play, "source" <> key(query) <> key(interval) <> key(timeframe) <> key(minimum_watch_seconds)}, opts: [ttl: @ttl])
@decorate cacheable(
cache: Cache,
key:
{Duration,
"source" <>
key(query) <> key(interval) <> key(timeframe) <> key(minimum_watch_seconds)},
opts: [ttl: @ttl]
)
def get_sources(query, interval, timeframe, minimum_watch_seconds) do
interval = interval || @default_interval
timeframe = timeframe || @default_timeframe
minimum_watch_seconds = minimum_watch_seconds || @default_minimum_watch_seconds

result = Play
|> join(:left, [p], s in assoc(p, :session))
|> join(:left, [p, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> group_by([p, s, v], [fragment(~s|time_bucket('?', ?)|, literal(^interval), p.timestamp), p.session_id, v.source_uri])
|> select([p, s, v], %{
host: fragment(~s|(? ->> ?)|, v.source_uri, "host"),
path: fragment(~s|(? ->> ?)|, v.source_uri, "path"),
interval: fragment(~s|time_bucket('?', ?)|, literal(^interval), p.timestamp),
elapsed_time: sum(p.elapsed_time)
})
|> subquery()
|> where([e], e.elapsed_time >= ^minimum_watch_seconds)
|> having([e], sum(e.elapsed_time) >= ^minimum_watch_seconds)
|> group_by([e], [e.host, e.path, e.interval])
|> select([e], %{
interval: e.interval,
host: e.host,
path: e.path,
views: count(e.interval)
})
|> Repo.all()
result =
Duration
|> join(:left, [d], s in assoc(d, :session))
|> join(:left, [d, s], v in assoc(s, :video))
|> where_query(query)
|> where_timeframe(timeframe)
|> group_by([d, s, v], [
fragment(~s|time_bucket('?', ?)|, literal(^interval), d.timestamp),
d.session_id,
v.source_uri
])
|> select([d, s, v], %{
host: fragment(~s|(? ->> ?)|, v.source_uri, "host"),
path: fragment(~s|(? ->> ?)|, v.source_uri, "path"),
interval: fragment(~s|time_bucket('?', ?)|, literal(^interval), d.timestamp),
elapsed_time: sum(d.elapsed_time)
})
|> subquery()
|> where([e], e.elapsed_time >= ^minimum_watch_seconds)
|> having([e], sum(e.elapsed_time) >= ^minimum_watch_seconds)
|> group_by([e], [e.host, e.path, e.interval])
|> select([e], %{
interval: e.interval,
host: e.host,
path: e.path,
views: count(e.interval)
})
|> Repo.all()

result
end
Expand All @@ -129,22 +176,22 @@ defmodule MaveMetrics.API do
end

defp where_query(q, %{"video" => video_query} = _query) do
q |> where([p, s, v], fragment("? @> ?", v.metadata, ^video_query))
q |> where([d, s, v], fragment("? @> ?", v.metadata, ^video_query))
end

defp where_query(q, %{"session" => session_query} = _query) do
q |> where([p, s, v], fragment("? @> ?", s.metadata, ^session_query))
q |> where([d, s, v], fragment("? @> ?", s.metadata, ^session_query))
end

defp where_query(q, query) do
q |> where([p, s, v], v.identifier == ^query)
q |> where([d, s, v], v.identifier == ^query)
end

defp select_details(query, interval) do
query
|> select([p, s, v], %{
interval: fragment("time_bucket('?', ?)", literal(^interval), p.timestamp),
elapsed_time: sum(p.elapsed_time),
|> select([d, s, v], %{
interval: fragment("time_bucket('?', ?)", literal(^interval), d.timestamp),
elapsed_time: sum(d.elapsed_time),
platform_mac: case_when(s.platform == :mac, 1, 0),
platform_ios: case_when(s.platform == :ios, 1, 0),
platform_android: case_when(s.platform == :android, 1, 0),
Expand Down Expand Up @@ -204,20 +251,20 @@ defmodule MaveMetrics.API do
{:ok, to} = DateTime.from_unix(to_timestamp)

query
|> where([p, s, v], p.timestamp >= ^from)
|> where([p, s, v], p.timestamp <= ^to)
|> where([d, s, v], d.timestamp >= ^from)
|> where([d, s, v], d.timestamp <= ^to)
end

defp where_timeframe(query, timeframe) when is_number(timeframe) do
{:ok, from} = DateTime.from_unix(timeframe)

query
|> where([p, s, v], p.timestamp >= ^from)
|> where([d, s, v], d.timestamp >= ^from)
end

defp where_timeframe(query, timeframe) do
query
|> where([p, s, v], p.timestamp >= fragment("now() - interval '?'", literal(^timeframe)))
|> where([d, s, v], d.timestamp >= fragment("now() - interval '?'", literal(^timeframe)))
end

defp key(%{} = query) do
Expand Down
3 changes: 2 additions & 1 deletion lib/mave_metrics/keys.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule MaveMetrics.Keys do

def revoke_key(%{"key" => key}) do
with %Key{} = key <- Key |> where([k], k.key == ^key) |> Repo.one(),
{:ok, _} <- key |> Key.changeset(%{disabled_at: DateTime.utc_now()}) |> Repo.update() do
{:ok, _} <- key |> Key.changeset(%{disabled_at: DateTime.utc_now()}) |> Repo.update() do
:ok
else
_ -> :error
Expand All @@ -51,6 +51,7 @@ defmodule MaveMetrics.Keys do
|> case do
{:ok, result} ->
{:ok, result.key}

{:error, reason} ->
{:error, reason}
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
defmodule MaveMetrics.Session.Play do
defmodule MaveMetrics.Session.Duration do
use Ecto.Schema
import Ecto.Changeset

@primary_key false
@foreign_key_type :binary_id

@required_fields ~w(timestamp from)a
@optional_fields ~w(session_id to elapsed_time)a
@optional_fields ~w(session_id to elapsed_time type)a

schema "plays" do
schema "durations" do
field :timestamp, :utc_datetime_usec, primary_key: true
belongs_to :session, MaveMetrics.Session, primary_key: true

field :type, Ecto.Enum, values: [:play, :rebuffering, :fullscreen]

field :from, :float
field :to, :float

Expand Down
12 changes: 9 additions & 3 deletions lib/mave_metrics/session/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@ defmodule MaveMetrics.Session do
@optional_fields ~w(browser_type platform device_type metadata)a

schema "sessions" do
field :browser_type, Ecto.Enum, values: [:edge, :ie, :chrome, :firefox, :opera, :safari, :brave, :other], default: :other
field :platform, Ecto.Enum, values: [:ios, :android, :mac, :windows, :linux, :other], default: :other
field :browser_type, Ecto.Enum,
values: [:edge, :ie, :chrome, :firefox, :opera, :safari, :brave, :other],
default: :other

field :platform, Ecto.Enum,
values: [:ios, :android, :mac, :windows, :linux, :other],
default: :other

field :device_type, Ecto.Enum, values: [:mobile, :tablet, :desktop, :other], default: :other

belongs_to :video, MaveMetrics.Video
belongs_to :key, MaveMetrics.Key

has_many :events, MaveMetrics.Session.Event
has_many :sources, MaveMetrics.Session.Source
has_many :plays, MaveMetrics.Session.Play
has_many :durations, MaveMetrics.Session.Duration

field :metadata, :map

Expand Down
Loading
Loading