Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Usage API: breakdownBy=creatorId #183

Merged
merged 13 commits into from
Feb 29, 2024
6 changes: 3 additions & 3 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ func (h *apiHandler) queryUsage() http.HandlerFunc {
}

qs := r.URL.Query()
creatorId := qs.Get("creatorId")
victorges marked this conversation as resolved.
Show resolved Hide resolved

query := usage.QuerySpec{
From: from,
Expand All @@ -353,18 +352,19 @@ func (h *apiHandler) queryUsage() http.HandlerFunc {
UserID: userId,
CreatorID: qs.Get("creatorId"),
},
BreakdownBy: qs["breakdownBy[]"],
}

if qs.Get("timeStep") == "" {
usage, err := h.usage.QuerySummary(r.Context(), userId, creatorId, query)
usage, err := h.usage.QuerySummary(r.Context(), userId, query)
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}

respondJson(rw, http.StatusOK, usage)
} else {
usage, err := h.usage.QuerySummaryWithTimestep(r.Context(), userId, creatorId, query)
usage, err := h.usage.QuerySummaryWithTimestep(r.Context(), userId, query)
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
Expand Down
84 changes: 49 additions & 35 deletions usage/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"

"cloud.google.com/go/bigquery"
Expand All @@ -18,9 +19,14 @@ type QueryFilter struct {
}

type QuerySpec struct {
TimeStep string
From, To *time.Time
Filter QueryFilter
TimeStep string
From, To *time.Time
Filter QueryFilter
BreakdownBy []string
}

var usageBreakdownFields = map[string]string{
"creatorId": "creator_id",
}

type FromToQuerySpec struct {
Expand All @@ -33,12 +39,13 @@ var allowedTimeSteps = map[string]bool{
}

type UsageSummaryRow struct {
UserID string `bigquery:"user_id"`
CreatorID string `bigquery:"creator_id"`
UserID string `bigquery:"user_id"`
CreatorID bigquery.NullString `bigquery:"creator_id"`
TimeInterval time.Time `bigquery:"time_interval"`

DeliveryUsageMins float64 `bigquery:"delivery_usage_mins"`
TotalUsageMins float64 `bigquery:"transcode_total_usage_mins"`
StorageUsageMins float64 `bigquery:"storage_usage_mins"`
DeliveryUsageMins bigquery.NullFloat64 `bigquery:"delivery_usage_mins"`
TotalUsageMins bigquery.NullFloat64 `bigquery:"transcode_total_usage_mins"`
StorageUsageMins bigquery.NullFloat64 `bigquery:"storage_usage_mins"`
}

type ActiveUsersSummaryRow struct {
Expand Down Expand Up @@ -69,10 +76,10 @@ type TotalUsageSummaryRow struct {
}

type BigQuery interface {
QueryUsageSummary(ctx context.Context, userID string, creatorID string, spec QuerySpec) (*UsageSummaryRow, error)
QueryUsageSummaryWithTimestep(ctx context.Context, userID string, creatorID string, spec QuerySpec) (*[]UsageSummaryRow, error)
QueryTotalUsageSummary(ctx context.Context, spec FromToQuerySpec) (*[]TotalUsageSummaryRow, error)
QueryActiveUsersUsageSummary(ctx context.Context, spec FromToQuerySpec) (*[]ActiveUsersSummaryRow, error)
QueryUsageSummary(ctx context.Context, userID string, spec QuerySpec) ([]UsageSummaryRow, error)
QueryUsageSummaryWithTimestep(ctx context.Context, userID string, spec QuerySpec) ([]UsageSummaryRow, error)
QueryTotalUsageSummary(ctx context.Context, spec FromToQuerySpec) ([]TotalUsageSummaryRow, error)
QueryActiveUsersUsageSummary(ctx context.Context, spec FromToQuerySpec) ([]ActiveUsersSummaryRow, error)
}

type BigQueryOptions struct {
Expand Down Expand Up @@ -125,33 +132,28 @@ type bigqueryHandler struct {

// usage summary query

func (bq *bigqueryHandler) QueryUsageSummary(ctx context.Context, userID string, creatorID string, spec QuerySpec) (*UsageSummaryRow, error) {
sql, args, err := buildUsageSummaryQuery(bq.opts.HourlyUsageTable, userID, creatorID, spec)
func (bq *bigqueryHandler) QueryUsageSummary(ctx context.Context, userID string, spec QuerySpec) ([]UsageSummaryRow, error) {
sql, args, err := buildUsageSummaryQuery(bq.opts.HourlyUsageTable, userID, spec)
if err != nil {
return nil, fmt.Errorf("error building usage summary query: %w", err)
}

bqRows, err := doBigQuery[UsageSummaryRow](bq, ctx, sql, args)
if err != nil {
return nil, fmt.Errorf("bigquery error: %w", err)
} else if len(bqRows) > 1 {
return nil, fmt.Errorf("internal error, query returned %d rows", len(bqRows))
} else if len(bqRows) > maxBigQueryResultRows {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a separate check for when we have breakdown vs when we don't? When we don't have any breakdown there should be only 1 result, so it'd be an error to have more results

return nil, fmt.Errorf("query must return less than %d datapoints. consider decreasing your timeframe or increasing the time step", maxBigQueryResultRows)
}

if len(bqRows) == 0 {
return &UsageSummaryRow{
UserID: userID,
CreatorID: creatorID,
DeliveryUsageMins: 0,
TotalUsageMins: 0,
StorageUsageMins: 0,
}, nil
}
return &bqRows[0], nil
return nil, nil
}

return bqRows, nil
}

func (bq *bigqueryHandler) QueryUsageSummaryWithTimestep(ctx context.Context, userID string, creatorID string, spec QuerySpec) (*[]UsageSummaryRow, error) {
sql, args, err := buildUsageSummaryQuery(bq.opts.HourlyUsageTable, userID, creatorID, spec)
func (bq *bigqueryHandler) QueryUsageSummaryWithTimestep(ctx context.Context, userID string, spec QuerySpec) ([]UsageSummaryRow, error) {

sql, args, err := buildUsageSummaryQuery(bq.opts.HourlyUsageTable, userID, spec)
if err != nil {
return nil, fmt.Errorf("error building usage summary query: %w", err)
}
Expand All @@ -171,10 +173,10 @@ func (bq *bigqueryHandler) QueryUsageSummaryWithTimestep(ctx context.Context, us
return nil, nil
}

return &bqRows, nil
return bqRows, nil
}

func (bq *bigqueryHandler) QueryTotalUsageSummary(ctx context.Context, spec FromToQuerySpec) (*[]TotalUsageSummaryRow, error) {
func (bq *bigqueryHandler) QueryTotalUsageSummary(ctx context.Context, spec FromToQuerySpec) ([]TotalUsageSummaryRow, error) {
sql, args, err := buildTotalUsageSummaryQuery(bq.opts.DailyUsageTable, spec)
if err != nil {
return nil, fmt.Errorf("error building usage summary query: %w", err)
Expand All @@ -195,10 +197,10 @@ func (bq *bigqueryHandler) QueryTotalUsageSummary(ctx context.Context, spec From
return nil, nil
}

return &bqRows, nil
return bqRows, nil
}

func (bq *bigqueryHandler) QueryActiveUsersUsageSummary(ctx context.Context, spec FromToQuerySpec) (*[]ActiveUsersSummaryRow, error) {
func (bq *bigqueryHandler) QueryActiveUsersUsageSummary(ctx context.Context, spec FromToQuerySpec) ([]ActiveUsersSummaryRow, error) {
sql, args, err := buildActiveUsersUsageSummaryQuery(bq.opts.HourlyUsageTable, bq.opts.UsersTable, spec)
if err != nil {
return nil, fmt.Errorf("error building active users summary query: %w", err)
Expand All @@ -219,10 +221,10 @@ func (bq *bigqueryHandler) QueryActiveUsersUsageSummary(ctx context.Context, spe
return nil, nil
}

return &bqRows, nil
return bqRows, nil
}

func buildUsageSummaryQuery(table string, userID string, creatorID string, spec QuerySpec) (string, []interface{}, error) {
func buildUsageSummaryQuery(table string, userID string, spec QuerySpec) (string, []interface{}, error) {
if userID == "" {
return "", nil, fmt.Errorf("userID cannot be empty")
}
Expand All @@ -235,7 +237,7 @@ func buildUsageSummaryQuery(table string, userID string, creatorID string, spec
Limit(maxBigQueryResultRows + 1)

if creatorId := spec.Filter.CreatorID; creatorId != "" {
query = query.Where("creator_id = ?", creatorID)
query = query.Columns("creator_id").Where("creator_id = ?", creatorId).GroupBy("creator_id")
}

if from := spec.From; from != nil {
Expand All @@ -258,6 +260,18 @@ func buildUsageSummaryQuery(table string, userID string, creatorID string, spec

query = withUserIdFilter(query, userID)

for _, by := range spec.BreakdownBy {
field, ok := usageBreakdownFields[by]
if !ok {
return "", nil, fmt.Errorf("invalid breakdown field: %s", by)
}
// skip breakdowns that are already in the query
if sql, _, _ := query.ToSql(); strings.Contains(sql, field) {
continue
}
query = query.Columns(field).GroupBy(field)
}

sql, args, err := query.ToSql()
if err != nil {
return "", nil, err
Expand Down
77 changes: 69 additions & 8 deletions usage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,26 @@ package usage
import (
"context"
"fmt"
"strings"

"cloud.google.com/go/bigquery"
livepeer "github.com/livepeer/go-api-client"
"github.com/livepeer/livepeer-data/pkg/data"
)

type Metric struct {
TimeInterval *int64 `json:"TimeInterval,omitempty"`

// breakdown fields
UserID string `json:"UserID,omitempty"`
victorges marked this conversation as resolved.
Show resolved Hide resolved
CreatorID data.Nullable[string] `json:"CreatorID,omitempty"`

// metric data
DeliveryUsageMins data.Nullable[float64] `json:"DeliveryUsageMins,omitempty"`
TotalUsageMins data.Nullable[float64] `json:"TotalUsageMins,omitempty"`
StorageUsageMins data.Nullable[float64] `json:"StorageUsageMins,omitempty"`
}

type Client struct {
opts ClientOptions
lp *livepeer.Client
Expand All @@ -30,25 +46,70 @@ func NewClient(opts ClientOptions) (*Client, error) {
return &Client{opts, lp, bigquery}, nil
}

func (c *Client) QuerySummary(ctx context.Context, userID string, creatorID string, spec QuerySpec) (*UsageSummaryRow, error) {
summary, err := c.bigquery.QueryUsageSummary(ctx, userID, creatorID, spec)
func (c *Client) QuerySummary(ctx context.Context, userID string, spec QuerySpec) ([]Metric, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change from *UsageSummaryRow to []Metric is another breaking change. If a client was calling this API before and parsing the response as an object, it will now explode cause it's getting an array.

My suggestion is to build on top of the QuerySummary vs QuerySummaryWithTimestep separation, given the latter already returns an array. So, instead of changing every response to be an array, we could change QuerySummaryWithTimestep to be QuerySummaryWithBreakdown instead and update the check here to also call the breakdown version when len(breakdownBy) > 0. And then we leave the no-breakdown version still returning an object not to break any existing integrations. WDYT?

summary, err := c.bigquery.QueryUsageSummary(ctx, userID, spec)
if err != nil {
return nil, err
}

return summary, nil
metrics := usageSummaryToMetrics(summary, spec)
return metrics, nil
}

func usageSummaryToMetrics(rows []UsageSummaryRow, spec QuerySpec) []Metric {
metrics := make([]Metric, len(rows))
for i, row := range rows {
m := Metric{
UserID: row.UserID,
CreatorID: toStringPtr(row.CreatorID, spec.hasBreakdownBy("creatorId")),
DeliveryUsageMins: toFloat64Ptr(row.DeliveryUsageMins, true),
TotalUsageMins: toFloat64Ptr(row.TotalUsageMins, true),
StorageUsageMins: toFloat64Ptr(row.StorageUsageMins, true),
}

if !row.TimeInterval.IsZero() {
timestamp := row.TimeInterval.UnixMilli()
m.TimeInterval = &timestamp
}

metrics[i] = m
}
return metrics
}

func (c *Client) QuerySummaryWithTimestep(ctx context.Context, userID string, creatorID string, spec QuerySpec) (*[]UsageSummaryRow, error) {
summary, err := c.bigquery.QueryUsageSummaryWithTimestep(ctx, userID, creatorID, spec)
func toFloat64Ptr(bqFloat bigquery.NullFloat64, asked bool) data.Nullable[float64] {
return data.ToNullable(bqFloat.Float64, bqFloat.Valid, asked)
}

func toStringPtr(bqStr bigquery.NullString, asked bool) data.Nullable[string] {
return data.ToNullable(bqStr.StringVal, bqStr.Valid, asked)
}

func (q *QuerySpec) hasBreakdownBy(e string) bool {
// callers always set `e` as a string literal so we can panic if it's not valid
if usageBreakdownFields[e] == "" {
panic(fmt.Sprintf("unknown breakdown field %q", e))
}

for _, a := range q.BreakdownBy {
if strings.EqualFold(a, e) {
return true
}
}
return false
}

func (c *Client) QuerySummaryWithTimestep(ctx context.Context, userID string, spec QuerySpec) ([]Metric, error) {
summary, err := c.bigquery.QueryUsageSummaryWithTimestep(ctx, userID, spec)
if err != nil {
return nil, err
}

return summary, nil
metrics := usageSummaryToMetrics(summary, spec)
return metrics, nil
}

func (c *Client) QueryTotalSummary(ctx context.Context, spec FromToQuerySpec) (*[]TotalUsageSummaryRow, error) {
func (c *Client) QueryTotalSummary(ctx context.Context, spec FromToQuerySpec) ([]TotalUsageSummaryRow, error) {
summary, err := c.bigquery.QueryTotalUsageSummary(ctx, spec)
if err != nil {
return nil, err
Expand All @@ -57,7 +118,7 @@ func (c *Client) QueryTotalSummary(ctx context.Context, spec FromToQuerySpec) (*
return summary, nil
}

func (c *Client) QueryActiveUsageSummary(ctx context.Context, spec FromToQuerySpec) (*[]ActiveUsersSummaryRow, error) {
func (c *Client) QueryActiveUsageSummary(ctx context.Context, spec FromToQuerySpec) ([]ActiveUsersSummaryRow, error) {
summary, err := c.bigquery.QueryActiveUsersUsageSummary(ctx, spec)
if err != nil {
return nil, err
Expand Down
3 changes: 1 addition & 2 deletions views/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"cloud.google.com/go/bigquery"
"github.com/Masterminds/squirrel"
"github.com/golang/glog"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
Expand Down Expand Up @@ -279,7 +278,6 @@ func doBigQuery[RowT any](bq *bigqueryHandler, ctx context.Context, sql string,
query := bq.client.Query(sql)
query.Parameters = toBigQueryParameters(args)
query.MaxBytesBilled = bq.opts.MaxBytesBilledPerBigQuery
glog.V(10).Infof("Running query. sql=%q args=%s", sql, args)

it, err := query.Read(ctx)
if err != nil {
Expand Down Expand Up @@ -311,5 +309,6 @@ func toTypedValues[RowT any](it *bigquery.RowIterator) ([]RowT, error) {

values = append(values, row)
}

return values, nil
}
Loading