diff --git a/api/handler.go b/api/handler.go index 6690946..d5d5d07 100644 --- a/api/handler.go +++ b/api/handler.go @@ -133,6 +133,10 @@ func (h *apiHandler) viewershipHandler() chi.Router { With(h.cache(false)). MethodFunc("GET", fmt.Sprintf(`/{%s}/total`, assetIDParam), h.getTotalViews) + // realtime viewership server side (internal-only) + h.withMetrics(router, "query_realtime_server_viewership"). + MethodFunc("GET", "/internal/server/now", h.queryRealtimeServerViewership()) + // total views public API h.withMetrics(router, "query_total_viewership"). With(h.cache(false)). @@ -558,6 +562,29 @@ func (h *apiHandler) getTotalViews(rw http.ResponseWriter, r *http.Request) { respondJson(rw, http.StatusOK, totalViews) } +func (h *apiHandler) queryRealtimeServerViewership() http.HandlerFunc { + return func(rw http.ResponseWriter, r *http.Request) { + userId := r.URL.Query().Get("userId") + if userId == "" { + respondError(rw, http.StatusBadRequest, errors.New("userId is required")) + return + } + + if !isCallerAdmin(r) { + respondError(rw, http.StatusForbidden, errors.New("only admins can query server-side viewership")) + return + } + + metrics, err := h.views.QueryRealtimeServerViews(r.Context(), userId) + if err != nil { + respondError(rw, http.StatusInternalServerError, err) + return + } + + respondJson(rw, http.StatusOK, metrics) + } +} + func (h *apiHandler) getStreamHealth(rw http.ResponseWriter, r *http.Request) { respondJson(rw, http.StatusOK, getStreamStatus(r)) } diff --git a/views/client.go b/views/client.go index 7784128..fab6eb7 100644 --- a/views/client.go +++ b/views/client.go @@ -107,6 +107,16 @@ func (c *Client) Deprecated_GetTotalViews(ctx context.Context, id string) ([]Tot }}, nil } +func (c *Client) QueryRealtimeServerViews(ctx context.Context, userId string) ([]Metric, error) { + viewCount, err := c.prom.QueryRealtimeViews(ctx, userId) + if err != nil { + return nil, fmt.Errorf("error querying start views: %w", err) + } + return []Metric{{ + ViewCount: viewCount, + }}, nil +} + func (c *Client) QuerySummary(ctx context.Context, playbackID string) (*Metric, error) { summary, err := c.bigquery.QueryViewsSummary(ctx, playbackID) if err != nil { diff --git a/views/prometheus.go b/views/prometheus.go index 9033597..f655553 100644 --- a/views/prometheus.go +++ b/views/prometheus.go @@ -33,6 +33,26 @@ func NewPrometheus(config promClient.Config) (*Prometheus, error) { func (c *Prometheus) QueryStartViews(ctx context.Context, asset *livepeer.Asset) (int64, error) { query := startViewsQuery(asset.PlaybackID, asset.PlaybackRecordingID) + return c.queryInt64(ctx, query) +} + +func startViewsQuery(playbackID, playbackRecordingID string) string { + queryID := playbackID + if playbackRecordingID != "" { + queryID = fmt.Sprintf("(%s|%s)", playbackID, playbackRecordingID) + } + return fmt.Sprintf( + `sum(increase(mist_playux_count{strm=~"video(rec)?\\+%s"} [1y]))`, + queryID, + ) +} + +func (c *Prometheus) QueryRealtimeViews(ctx context.Context, userId string) (int64, error) { + query := fmt.Sprintf(`sum(mist_sessions{sessType="viewers", user_id="%s"})`, userId) + return c.queryInt64(ctx, query) +} + +func (c *Prometheus) queryInt64(ctx context.Context, query string) (int64, error) { value, warn, err := c.api.Query(ctx, query, time.Time{}) if len(warn) > 0 { glog.Warningf("Prometheus query warnings: %q", warn) @@ -51,14 +71,3 @@ func (c *Prometheus) QueryStartViews(ctx context.Context, asset *livepeer.Asset) } return int64(vec[0].Value), nil } - -func startViewsQuery(playbackID, playbackRecordingID string) string { - queryID := playbackID - if playbackRecordingID != "" { - queryID = fmt.Sprintf("(%s|%s)", playbackID, playbackRecordingID) - } - return fmt.Sprintf( - `sum(increase(mist_playux_count{strm=~"video(rec)?\\+%s"} [1y]))`, - queryID, - ) -}