Skip to content

Commit

Permalink
remove grouped usage
Browse files Browse the repository at this point in the history
  • Loading branch information
gioelecerati committed Oct 31, 2023
1 parent e9ccd28 commit 7d6ec86
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 138 deletions.
60 changes: 0 additions & 60 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package api

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -175,10 +174,6 @@ func (h *apiHandler) usageHandler() chi.Router {
With(h.cache(true)).
MethodFunc("GET", `/query/active`, h.queryActiveUsersUsage())

h.withMetrics(router, "query_grouped_usage").
With(h.cache(true)).
MethodFunc("POST", `/query/grouped`, h.queryUsageGrouped())

return router
}

Expand Down Expand Up @@ -471,61 +466,6 @@ func (h *apiHandler) queryActiveUsersUsage() http.HandlerFunc {
}
}

func (h *apiHandler) queryUsageGrouped() http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) {

var users []interface{}
if err := json.NewDecoder(r.Body).Decode(&users); err != nil {
respondError(rw, http.StatusBadRequest, err)
return
}

glog.Infof("Querying grouped usage for %v users", len(users))

_, ok := r.Context().Value(userIdContextKey).(string)
if !ok {
respondError(rw, http.StatusInternalServerError, errors.New("request not authenticated"))
return
}

isCallerAdmin, ok := r.Context().Value(isCallerAdminContextKey).(string)

if !ok {
respondError(rw, http.StatusInternalServerError, errors.New("request not authenticated - unable to determine if caller is admin"))
return
}

if isCallerAdmin != "true" {
respondError(rw, http.StatusForbidden, errors.New("only admins can query grouped usage"))
return
}

userList := make([]usage.GroupedParams, 0)

for _, user := range users {
groupedParams := usage.GroupedParams{
UserId: user.(map[string]interface{})["userId"].(string),
BillingCycleStart: user.(map[string]interface{})["billingCycleStart"].(string),
BillingCycleEnd: user.(map[string]interface{})["billingCycleEnd"].(string),
}

userList = append(userList, groupedParams)
}

query := usage.GroupedQuerySpec{
Users: userList,
}

usage, err := h.usage.QueryGroupedUsageSummary(r.Context(), query)
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}

respondJson(rw, http.StatusOK, usage)
}
}

func (h *apiHandler) getTotalViews(rw http.ResponseWriter, r *http.Request) {
assetID := apiParam(r, assetIDParam)

Expand Down
73 changes: 4 additions & 69 deletions usage/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"

"cloud.google.com/go/bigquery"
Expand Down Expand Up @@ -57,6 +56,10 @@ type ActiveUsersSummaryRow struct {
Email string `bigquery:"email" json:"email"`
From time.Time `bigquery:"interval_start_date" json:"from"`
To time.Time `bigquery:"interval_end_date" json:"to"`

DeliveryUsageMins float64 `bigquery:"delivery_usage_mins" json:"deliveryUsageMins"`
TotalUsageMins float64 `bigquery:"transcode_total_usage_mins" json:"totalUsageMins"`
StorageUsageMins float64 `bigquery:"storage_usage_mins" json:"storageUsageMins"`
}

type GroupedUsageRow struct {
Expand Down Expand Up @@ -90,7 +93,6 @@ type BigQuery interface {
QueryUsageSummaryWithTimestep(ctx context.Context, userID string, creatorID string, spec QuerySpec) (*[]UsageSummaryRow, error)
QueryTotalUsageSummary(ctx context.Context, spec FromToQuerySpec) (*[]TotalUsageSummaryRow, error)
QueryActiveUsersUsageSummary(ctx context.Context, spec FromToQuerySpec) (*[]ActiveUsersSummaryRow, error)
QueryGroupedUsageSummary(ctx context.Context, spec GroupedQuerySpec) (*[]GroupedUsageRow, error)
}

type BigQueryOptions struct {
Expand Down Expand Up @@ -240,30 +242,6 @@ func (bq *bigqueryHandler) QueryActiveUsersUsageSummary(ctx context.Context, spe
return &bqRows, nil
}

func (bq *bigqueryHandler) QueryGroupedUsageSummary(ctx context.Context, spec GroupedQuerySpec) (*[]GroupedUsageRow, error) {
sql, args, err := buildGroupedUsageSummaryQuery(bq.opts.HourlyUsageTable, spec)
if err != nil {
return nil, fmt.Errorf("error building active users summary query: %w", err)
}

bqRows, err := doBigQuery[GroupedUsageRow](bq, ctx, sql, args)
if err != nil {
return nil, fmt.Errorf("bigquery error: %w", err)
}

if err != nil {
return nil, fmt.Errorf("bigquery error: %w", err)
} else if len(bqRows) > maxBigQueryResultRows {
return nil, fmt.Errorf("query must return less than %d datapoints. consider decreasing your timeframe or increasing the time step", maxBigQueryResultRows)
}

if len(bqRows) == 0 {
return nil, nil
}

return &bqRows, nil
}

func buildUsageSummaryQuery(table string, userID string, creatorID string, spec QuerySpec) (string, []interface{}, error) {
if userID == "" {
return "", nil, fmt.Errorf("userID cannot be empty")
Expand Down Expand Up @@ -368,49 +346,6 @@ func buildActiveUsersUsageSummaryQuery(billingTable, usersTable string, spec Fro
return sql, args, nil
}

func buildGroupedUsageSummaryQuery(table string, spec GroupedQuerySpec) (string, []interface{}, error) {
var subqueryStrs []string
var allArgs []interface{}

for _, user := range spec.Users {
startTime, err := parseInputTimestamp(user.BillingCycleStart)
if err != nil {
return "", nil, err
}
endTime, err := parseInputTimestamp(user.BillingCycleEnd)
if err != nil {
return "", nil, err
}

subquery := squirrel.
Select(
"user_id",
"min(usage_hour_ts) as interval_start_date",
"max(usage_hour_ts) as interval_end_date",
"cast(sum(transcode_total_usage_mins) as FLOAT64) as transcode_total_usage_mins",
"cast(sum(delivery_usage_mins) as FLOAT64) as delivery_usage_mins",
"cast((sum(storage_usage_mins) / count(distinct usage_hour_ts)) as FLOAT64) as storage_usage_mins",
).
From(table).
Where(squirrel.Eq{"user_id": user.UserId}).
Where("usage_hour_ts BETWEEN TIMESTAMP_MILLIS(?) AND TIMESTAMP_MILLIS(?)", startTime.UnixMilli(), endTime.UnixMilli()).
GroupBy("user_id")

subSql, args, err := subquery.ToSql()
if err != nil {
return "", nil, err
}

subqueryStrs = append(subqueryStrs, subSql)
allArgs = append(allArgs, args...)
}

// Combine all subquery strings using "UNION ALL"
sql := strings.Join(subqueryStrs, " UNION ALL ")

return sql, allArgs, nil
}

// query helpers

func withUserIdFilter(query squirrel.SelectBuilder, userID string) squirrel.SelectBuilder {
Expand Down
9 changes: 0 additions & 9 deletions usage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,3 @@ func (c *Client) QueryActiveUsageSummary(ctx context.Context, spec FromToQuerySp

return summary, nil
}

func (c *Client) QueryGroupedUsageSummary(ctx context.Context, spec GroupedQuerySpec) (*[]GroupedUsageRow, error) {
summary, err := c.bigquery.QueryGroupedUsageSummary(ctx, spec)
if err != nil {
return nil, err
}

return summary, nil
}

0 comments on commit 7d6ec86

Please sign in to comment.