Skip to content

Commit

Permalink
Add endpoint /views/internal/server/now (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Jun 20, 2024
1 parent fb26320 commit b4b7663
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 11 deletions.
27 changes: 27 additions & 0 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func (h *apiHandler) viewershipHandler() chi.Router {
With(h.cache(false)).
MethodFunc("GET", fmt.Sprintf(`/{%s}/total`, assetIDParam), h.getTotalViews)

// realtime viewership server side (internal-only)
h.withMetrics(router, "query_realtime_server_viewership").
MethodFunc("GET", "/internal/server/now", h.queryRealtimeServerViewership())

// total views public API
h.withMetrics(router, "query_total_viewership").
With(h.cache(false)).
Expand Down Expand Up @@ -558,6 +562,29 @@ func (h *apiHandler) getTotalViews(rw http.ResponseWriter, r *http.Request) {
respondJson(rw, http.StatusOK, totalViews)
}

func (h *apiHandler) queryRealtimeServerViewership() http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) {
userId := r.URL.Query().Get("userId")
if userId == "" {
respondError(rw, http.StatusBadRequest, errors.New("userId is required"))
return
}

if !isCallerAdmin(r) {
respondError(rw, http.StatusForbidden, errors.New("only admins can query server-side viewership"))
return
}

metrics, err := h.views.QueryRealtimeServerViews(r.Context(), userId)
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}

respondJson(rw, http.StatusOK, metrics)
}
}

func (h *apiHandler) getStreamHealth(rw http.ResponseWriter, r *http.Request) {
respondJson(rw, http.StatusOK, getStreamStatus(r))
}
Expand Down
10 changes: 10 additions & 0 deletions views/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ func (c *Client) Deprecated_GetTotalViews(ctx context.Context, id string) ([]Tot
}}, nil
}

func (c *Client) QueryRealtimeServerViews(ctx context.Context, userId string) ([]Metric, error) {
viewCount, err := c.prom.QueryRealtimeViews(ctx, userId)
if err != nil {
return nil, fmt.Errorf("error querying start views: %w", err)
}
return []Metric{{
ViewCount: viewCount,
}}, nil
}

func (c *Client) QuerySummary(ctx context.Context, playbackID string) (*Metric, error) {
summary, err := c.bigquery.QueryViewsSummary(ctx, playbackID)
if err != nil {
Expand Down
31 changes: 20 additions & 11 deletions views/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,26 @@ func NewPrometheus(config promClient.Config) (*Prometheus, error) {

func (c *Prometheus) QueryStartViews(ctx context.Context, asset *livepeer.Asset) (int64, error) {
query := startViewsQuery(asset.PlaybackID, asset.PlaybackRecordingID)
return c.queryInt64(ctx, query)
}

func startViewsQuery(playbackID, playbackRecordingID string) string {
queryID := playbackID
if playbackRecordingID != "" {
queryID = fmt.Sprintf("(%s|%s)", playbackID, playbackRecordingID)
}
return fmt.Sprintf(
`sum(increase(mist_playux_count{strm=~"video(rec)?\\+%s"} [1y]))`,
queryID,
)
}

func (c *Prometheus) QueryRealtimeViews(ctx context.Context, userId string) (int64, error) {
query := fmt.Sprintf(`sum(mist_sessions{sessType="viewers", user_id="%s"})`, userId)
return c.queryInt64(ctx, query)
}

func (c *Prometheus) queryInt64(ctx context.Context, query string) (int64, error) {
value, warn, err := c.api.Query(ctx, query, time.Time{})
if len(warn) > 0 {
glog.Warningf("Prometheus query warnings: %q", warn)
Expand All @@ -51,14 +71,3 @@ func (c *Prometheus) QueryStartViews(ctx context.Context, asset *livepeer.Asset)
}
return int64(vec[0].Value), nil
}

func startViewsQuery(playbackID, playbackRecordingID string) string {
queryID := playbackID
if playbackRecordingID != "" {
queryID = fmt.Sprintf("(%s|%s)", playbackID, playbackRecordingID)
}
return fmt.Sprintf(
`sum(increase(mist_playux_count{strm=~"video(rec)?\\+%s"} [1y]))`,
queryID,
)
}

0 comments on commit b4b7663

Please sign in to comment.