Skip to content

Commit

Permalink
Add metered billing cron for services (#1285)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljguarino authored Jan 5, 2024
1 parent 35c7587 commit 20eb55e
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 3 deletions.
6 changes: 5 additions & 1 deletion apps/core/lib/core/schema/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,14 @@ defmodule Core.Schema.Cluster do
from(c in query, where: c.owner_id == ^user_id)
end

def for_account(query, account_id) do
def for_account(query \\ __MODULE__, account_id) do
from(c in query, where: c.account_id == ^account_id)
end

def services(query \\ __MODULE__) do
from(c in query, select: sum(c.service_count))
end

def ordered(query \\ __MODULE__, order \\ [asc: :name]) do
from(c in query, order_by: ^order)
end
Expand Down
4 changes: 4 additions & 0 deletions apps/core/lib/core/schema/platform_subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ defmodule Core.Schema.PlatformSubscription do
)
end

def metered(query \\ __MODULE__) do
from(s in query, where: not is_nil(s.metered_id))
end

def ordered(query \\ __MODULE__, order \\ [asc: :id]) do
from(s in query, order_by: ^order)
end
Expand Down
10 changes: 10 additions & 0 deletions apps/core/lib/core/services/clusters.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ defmodule Core.Services.Clusters do
|> Core.Repo.exists?()
end

@doc """
Attempts to compute the service count for all clusters in an account
"""
@spec services(binary) :: integer | nil
def services(account_id) do
Cluster.for_account(account_id)
|> Cluster.services()
|> Core.Repo.one()
end

@doc """
Determines if all installations for a cluster have been synced
"""
Expand Down
13 changes: 12 additions & 1 deletion apps/core/lib/core/services/payments.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Core.Services.Payments do
import Core.Policies.Payments

alias Core.PubSub
alias Core.Services.{Repositories, Accounts}
alias Core.Services.{Repositories, Accounts, Clusters}
alias Core.Schema.{
Publisher,
Account,
Expand Down Expand Up @@ -416,6 +416,17 @@ defmodule Core.Services.Payments do
|> cancel_platform_subscription(user)
end

def send_usage(%PlatformSubscription{metered_id: item_id, account_id: aid}) when is_binary(item_id) do
with services when is_integer(services) <- Clusters.services(aid) do
Stripe.SubscriptionItem.Usage.create(item_id, %{
timestamp: DateTime.utc_now() |> DateTime.to_unix(),
action: :set,
quantity: ceil(services / 5),
})
end
end
def send_usage(_), do: :ok

@doc """
Appends a new usage record for the given line item to stripe's api
"""
Expand Down
23 changes: 23 additions & 0 deletions apps/cron/lib/cron/task/metering.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Cron.Task.Metering do
@moduledoc """
Backfills all repository readmes
"""
use Cron
alias Core.Schema.PlatformSubscription
alias Core.Services.Payments

def run() do
PlatformSubscription.metered()
|> PlatformSubscription.ordered(asc: :id)
|> Core.Repo.stream(method: :keyset)
|> Core.throttle(count: 50, pause: :timer.seconds(1))
|> Flow.from_enumerable(stages: 5, max_demand: 5)
|> Flow.map(&deliver/1)
|> log()
end

defp deliver(%PlatformSubscription{} = subscription) do
Logger.info "sending usage records for account #{subscription.account_id}"
Payments.send_usage(subscription)
end
end
19 changes: 19 additions & 0 deletions apps/cron/test/cron/task/metering_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Cron.Task.MeteringTest do
use Core.SchemaCase, async: false
use Mimic
alias Cron.Task.Metering

setup :set_mimic_global

describe "#run/0" do
test "it will sync usage for all updated accounts" do
account = insert(:account)
insert(:platform_subscription, account: account, metered_id: "id_1")
insert(:cluster, account: account, service_count: 2)
insert(:cluster, account: account, service_count: 4)
expect(Stripe.SubscriptionItem.Usage, :create, fn "id_1", %{action: :set, quantity: 2} -> {:ok, "id_1"} end)

1 = Metering.run()
end
end
end
1 change: 1 addition & 0 deletions apps/cron/test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Mimic.copy(Core.Conduit.Broker)
Mimic.copy(Stripe.SubscriptionItem.Usage)
ExUnit.configure formatters: [JUnitFormatter, ExUnit.CLIFormatter]
ExUnit.start()
2 changes: 1 addition & 1 deletion plural/helm/plural/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: v2
name: plural
description: A helm chart for installing plural
appVersion: 0.10.78
version: 0.10.78
version: 0.10.79
dependencies:
- name: hydra
version: 0.26.5
Expand Down
5 changes: 5 additions & 0 deletions plural/helm/plural/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -506,9 +506,14 @@ crons:
cronModule: Prune.Notifications
cronTab: "15 1 * * *"
envVars: []
- cronName: plrl-metering
cronModule: Task.Metering
cronTab: "30 1 * * *"
envVars: []
- cronName: plrl-digest
cronModule: Digest.Pending
cronTab: "0 12 * * 1"
envVars: []

hydraSecrets:
dsn: memory
Expand Down

0 comments on commit 20eb55e

Please sign in to comment.