Skip to content

Commit

Permalink
Add Cluster Usage History Recording (#1220)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljguarino authored Aug 29, 2023
1 parent 3384148 commit 58894a4
Show file tree
Hide file tree
Showing 12 changed files with 201 additions and 11 deletions.
39 changes: 39 additions & 0 deletions apps/core/lib/core/schema/cluster_usage_history.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule Core.Schema.ClusterUsageHistory do
use Piazza.Ecto.Schema
alias Core.Schema.{Account, Cluster}

schema "cluster_usage_history" do
field :cpu, :integer
field :memory, :integer

belongs_to :cluster, Cluster
belongs_to :account, Account

timestamps()
end

def inserted_after(query \\ __MODULE__, dt) do
from(u in query, where: u.inserted_at >= ^dt)
end

def for_cluster(query \\ __MODULE__, cluster_id) do
from(u in query, where: u.cluster_id == ^cluster_id)
end

def for_account(query \\ __MODULE__, account_id) do
from(u in query, where: u.account_id == ^account_id)
end

def ordered(query \\ __MODULE__, order \\ [asc: :inserted_at]) do
from(u in query, order_by: ^order)
end

@valid ~w(cpu memory cluster_id account_id)a

def changeset(model, attrs \\ %{}) do
model
|> cast(attrs, @valid)
|> foreign_key_constraint(:account_id)
|> validate_required([:account_id, :cluster_id])
end
end
12 changes: 11 additions & 1 deletion apps/core/lib/core/services/clusters.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Core.Services.Clusters do
alias Core.PubSub
alias Core.Policies.Account, as: AccountPolicy
alias Core.Services.{Dns, Repositories, Users, Clusters.Transfer}
alias Core.Schema.{Cluster, User, DnsRecord, UpgradeQueue, ClusterDependency}
alias Core.Schema.{Cluster, User, DnsRecord, UpgradeQueue, ClusterDependency, ClusterUsageHistory}

@type error :: {:error, term}
@type cluster_resp :: {:ok, Cluster.t} | error
Expand Down Expand Up @@ -45,6 +45,16 @@ defmodule Core.Services.Clusters do
|> Core.Repo.insert()
end

@doc """
Saves a usage record for a given cluster
"""
@spec save_usage(map, Cluster.t) :: {:ok, ClusterUsageHistory.t} | error
def save_usage(attrs, %Cluster{id: id, account_id: account_id}) do
%ClusterUsageHistory{cluster_id: id, account_id: account_id}
|> ClusterUsageHistory.changeset(attrs)
|> Core.Repo.insert()
end

@doc """
Transfers ownership of a cluster to a new user. This has three main components:
* replicate the installations from one owner to another, this includes moving oidc providers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Core.Repo.Migrations.AddClusterUsage do
use Ecto.Migration

def change do
create table(:cluster_usage_history, primary_key: false) do
add :id, :uuid, primary_key: true
add :cluster_id, :uuid
add :account_id, :uuid
add :memory, :bigint
add :cpu, :bigint

timestamps()
end
end
end
13 changes: 13 additions & 0 deletions apps/core/test/services/clusters_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,17 @@ defmodule Core.Services.ClustersTest do
{:error, _} = Clusters.transfer_ownership(cluster.name, to, user)
end
end

describe "#save_usage/2" do
test "it will save a usage record for a cluster" do
cluster = insert(:cluster)

{:ok, usage} = Clusters.save_usage(%{cpu: 1000, memory: 1000}, cluster)

assert usage.cluster_id == cluster.id
assert usage.account_id == cluster.account_id
assert usage.cpu == 1000
assert usage.memory == 1000
end
end
end
9 changes: 9 additions & 0 deletions apps/core/test/support/factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,15 @@ defmodule Core.Factory do
}
end

def cluster_usage_history_factory do
%Schema.ClusterUsageHistory{
account: build(:account),
cluster: build(:cluster),
cpu: 1000,
memory: 10000000
}
end

def with_password(%Schema.User{} = user, password) do
Schema.User.changeset(user, %{password: password})
|> Ecto.Changeset.apply_changes()
Expand Down
10 changes: 9 additions & 1 deletion apps/graphql/lib/graphql/resolvers/cluster.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule GraphQl.Resolvers.Cluster do
use GraphQl.Resolvers.Base, model: Core.Schema.Cluster
alias Core.Services.Clusters
alias Core.Schema.{DeferredUpdate, ClusterDependency}
alias Core.Schema.{DeferredUpdate, ClusterDependency, ClusterUsageHistory}

def query(ClusterDependency, _), do: ClusterDependency
def query(_, _), do: Cluster
Expand All @@ -16,6 +16,14 @@ defmodule GraphQl.Resolvers.Cluster do
|> paginate(args)
end

def usage_history(cluster, begin) do
ClusterUsageHistory.for_cluster(cluster.id)
|> ClusterUsageHistory.ordered()
|> ClusterUsageHistory.inserted_after(begin)
|> Core.Repo.all()
|> ok()
end

def create_dependency(%{source_id: sid, dest_id: did}, %{context: %{current_user: user}}) do
source = Clusters.get_cluster!(sid)
dest = Clusters.get_cluster!(did)
Expand Down
17 changes: 17 additions & 0 deletions apps/graphql/lib/graphql/schema/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ defmodule GraphQl.Schema.Cluster do
cluster, _, _ -> Cluster.upgrade_info(cluster)
end

@desc "CPU/Memory history for this cluster"
field :usage_history, list_of(:cluster_usage_history) do
arg :begin, non_null(:datetime)
resolve fn cluster, %{begin: begin}, _ -> Cluster.usage_history(cluster, begin) end
end

field :dependency, :cluster_dependency, resolve: dataloader(Cluster), description: "the dependencies a cluster has"

field :owner, :user, resolve: dataloader(User), description: "The user that owns the cluster."
Expand Down Expand Up @@ -58,6 +64,17 @@ defmodule GraphQl.Schema.Cluster do
timestamps()
end

@desc "A record of the utilization in a given cluster"
object :cluster_usage_history do
field :cpu, :integer
field :memory, :integer

field :cluster, :cluster, resolve: dataloader(Cluster)
field :account, :account, resolve: dataloader(Account)

timestamps()
end

connection node_type: :cluster

object :cluster_queries do
Expand Down
21 changes: 21 additions & 0 deletions apps/graphql/test/queries/cluster_queries_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ defmodule GraphQl.ClusterQueriesTest do
assert found["dependency"]["id"] == dep.id
end

test "it can fetch usage history for a cluster" do
user = insert(:user)
cluster = insert(:cluster, owner: user)
history = insert(:cluster_usage_history, cluster: cluster)
begin = Timex.now() |> Timex.shift(hours: -3) |> DateTime.to_iso8601()

{:ok, %{data: %{"cluster" => found}}} = run_query("""
query Cluster($id: ID!, $begin: DateTime!) {
cluster(id: $id) {
id
usageHistory(begin: $begin) { cpu memory }
}
}
""", %{"id" => cluster.id, "begin" => begin}, %{current_user: user})

assert found["id"] == cluster.id
[hist] = found["usageHistory"]
assert hist["cpu"] == history.cpu
assert hist["memory"] == history.memory
end

test "it can query upgrade info" do
user = insert(:user)
cluster = insert(:cluster, owner: user)
Expand Down
13 changes: 11 additions & 2 deletions apps/rtc/lib/rtc_web/channels/upgrade_channel.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
defmodule RtcWeb.UpgradeChannel do
use RtcWeb, :channel
alias Rtc.UpgradeBuffer
alias Core.Schema.{Upgrade}
alias Core.Services.Upgrades
alias Core.Schema.{Upgrade, Cluster}
alias Core.Services.{Upgrades, Clusters}

@ping_interval 60 * 1000

Expand Down Expand Up @@ -66,6 +66,15 @@ defmodule RtcWeb.UpgradeChannel do
end
end

def handle_in("usage", attrs, socket) do
case Core.Repo.preload(socket.assigns.queue, [:cluster]) do
%{cluster: %Cluster{} = cluster} = queue ->
Clusters.save_usage(attrs, cluster)
{:reply, :ok, assign(socket, :queue, queue)}
_ -> {:reply, {:error, "no cluster found"}, socket}
end
end

def handle_in("ack", %{"id" => id}, socket) do
case Upgrades.ack(id, socket.assigns.queue) do
{:ok, q} ->
Expand Down
17 changes: 17 additions & 0 deletions apps/rtc/test/rtc_web/channels/upgrade_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,22 @@ defmodule RtcWeb.UpgradeChannelTest do

refute_push "more", _
end

test "it can receive usage history" do
user = insert(:user)
cluster = insert(:cluster, owner: user)
q = insert(:upgrade_queue, user: user, cluster: cluster)
up = insert(:upgrade, queue: q)

{:ok, socket} = mk_socket(user)
{:ok, _, socket} = subscribe_and_join(socket, "queues:#{q.id}", %{})

ref = push(socket, "usage", %{"cpu" => 1000, "memory" => 10000})
assert_reply ref, :ok, _

[hist] = Core.Schema.ClusterUsageHistory.for_cluster(cluster.id) |> Core.Repo.all()
assert hist.cpu == 1000
assert hist.memory == 10000
end
end
end
27 changes: 20 additions & 7 deletions schema/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,9 @@ type Cluster {
"pending upgrades for each installed app"
upgradeInfo: [UpgradeInfo]

"CPU\/Memory history for this cluster"
usageHistory(begin: DateTime!): [ClusterUsageHistory]

"the dependencies a cluster has"
dependency: ClusterDependency

Expand Down Expand Up @@ -2159,6 +2162,16 @@ type Stack {
updatedAt: DateTime
}

"A record of the utilization in a given cluster"
type ClusterUsageHistory {
cpu: Int
memory: Int
cluster: Cluster
account: Account
insertedAt: DateTime
updatedAt: DateTime
}

type Community {
discord: String
slack: String
Expand Down Expand Up @@ -2425,19 +2438,19 @@ type UpgradePath {
type: ValueType!
}

type RoleBinding {
type VersionTag {
id: ID!
user: User
group: Group
tag: String!
version: Version
chart: Chart
insertedAt: DateTime
updatedAt: DateTime
}

type VersionTag {
type RoleBinding {
id: ID!
tag: String!
version: Version
chart: Chart
user: User
group: Group
insertedAt: DateTime
updatedAt: DateTime
}
Expand Down
19 changes: 19 additions & 0 deletions www/src/generated/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,14 @@ export type Cluster = {
updatedAt?: Maybe<Scalars['DateTime']['output']>;
/** pending upgrades for each installed app */
upgradeInfo?: Maybe<Array<Maybe<UpgradeInfo>>>;
/** CPU/Memory history for this cluster */
usageHistory?: Maybe<Array<Maybe<ClusterUsageHistory>>>;
};


/** A Kubernetes cluster that can be used to deploy applications on with Plural. */
export type ClusterUsageHistoryArgs = {
begin: Scalars['DateTime']['input'];
};

/** Input for creating or updating a cluster. */
Expand Down Expand Up @@ -494,6 +502,17 @@ export type ClusterInformationAttributes = {
version?: InputMaybe<Scalars['String']['input']>;
};

/** A record of the utilization in a given cluster */
export type ClusterUsageHistory = {
__typename?: 'ClusterUsageHistory';
account?: Maybe<Account>;
cluster?: Maybe<Cluster>;
cpu?: Maybe<Scalars['Int']['output']>;
insertedAt?: Maybe<Scalars['DateTime']['output']>;
memory?: Maybe<Scalars['Int']['output']>;
updatedAt?: Maybe<Scalars['DateTime']['output']>;
};

export type Community = {
__typename?: 'Community';
discord?: Maybe<Scalars['String']['output']>;
Expand Down

0 comments on commit 58894a4

Please sign in to comment.