Skip to content

Commit

Permalink
Fix pipeline context genserver apply (#1227)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljguarino authored Jul 27, 2024
1 parent b285ed5 commit 4315349
Show file tree
Hide file tree
Showing 15 changed files with 166 additions and 50 deletions.
6 changes: 3 additions & 3 deletions config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ config :console, Console.Cron.Scheduler,
{"*/10 * * * *", {Console.Deployments.Cron, :dequeue_stacks, []}},
{"*/10 * * * *", {Console.Deployments.Cron, :place_run_workers, []}},
{"*/30 * * * *", {Console.Deployments.Cron, :spawn_stack_crons, []}},
{"*/2 * * * *", {Console.Deployments.Cron, :scan_pipeline_stages, []}},
{"*/2 * * * *", {Console.Deployments.Cron, :scan_pending_promotions, []}},
{"*/2 * * * *", {Console.Deployments.Cron, :scan_pending_contexts, []}},
{"*/4 * * * *", {Console.Deployments.Cron, :scan_pipeline_stages, []}},
{"*/4 * * * *", {Console.Deployments.Cron, :scan_pending_promotions, []}},
{"*/4 * * * *", {Console.Deployments.Cron, :scan_pending_contexts, []}},
{"*/10 * * * *", {Console.Deployments.Init, :ensure_secret, []}},
{"0 0 1-31/2 * *", {Console.Deployments.Cron, :backfill_deprecations, []}},
{"*/10 * * * *", {Console.Deployments.Cron, :backfill_global_services, []}},
Expand Down
83 changes: 59 additions & 24 deletions lib/console/deployments/pipelines.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Console.Deployments.Pipelines do
use Console.Services.Base
require Logger
use Nebulex.Caching
import Console.Deployments.Policies
import Console.Deployments.Pipelines.Stability
Expand Down Expand Up @@ -123,7 +124,8 @@ defmodule Console.Deployments.Pipelines do
end)
|> execute()
end)
|> execute(extract: :ctx)
|> execute()
|> flush_context_events(fn %{link: links} -> Map.values(links) end, :ctx)
|> notify(:create, user)
end

Expand All @@ -132,7 +134,6 @@ defmodule Console.Deployments.Pipelines do
stage
|> PipelineStage.changeset(%{context_id: ctx_id})
|> Repo.update()
|> notify(:update)
end

@doc """
Expand All @@ -145,8 +146,11 @@ defmodule Console.Deployments.Pipelines do
|> Repo.update()
end

def apply_pipeline_context(%PipelineStage{context_id: ctx_id, applied_context_id: ctx_id} = stage),
do: {:ok, stage}
def apply_pipeline_context(%PipelineStage{context_id: ctx_id, applied_context_id: ctx_id} = stage) do
Logger.info "ignoring applying existing context to stage #{stage.id}"
{:ok, stage}
end

def apply_pipeline_context(%PipelineStage{} = stage) do
bot = %{Users.get_bot!("console") | roles: %{admin: true}}
%{context: ctx} = stage = Repo.preload(stage, [:pipeline, :context, :errors, services: [:service, criteria: :pr_automation]])
Expand Down Expand Up @@ -260,6 +264,7 @@ defmodule Console.Deployments.Pipelines do
|> PipelineGate.update_changeset(attrs)
|> allow(cluster, :update)
|> when_ok(:update)
|> notify(:update)
end

@doc """
Expand Down Expand Up @@ -306,7 +311,7 @@ defmodule Console.Deployments.Pipelines do
def build_promotion(%PipelineStage{id: id} = stage) do
start_transaction()
|> add_operation(:stage, fn _ ->
case Repo.preload(stage, [:from_edges, :context, promotion: [services: :revision], services: [service: :revision]]) do
case Repo.preload(stage, [:context, from_edges: :gates, promotion: [services: :revision], services: [service: :revision]]) do
%{from_edges: [_ | _]} = stage -> {:ok, stage}
_ -> {:error, "this stage has no successors"}
end
Expand All @@ -322,29 +327,32 @@ defmodule Console.Deployments.Pipelines do
old = extant(promo)
|> Map.drop(Enum.map(svcs, fn {%{id: id}, _} -> id end))
|> Map.values()
|> Console.mapify()
|> Console.mapify() # prior services to promote w/o a healthy revision yet
# services w/ a new healthy revision
new = Enum.map(svcs, fn {%{id: id, sha: sha}, %{id: rid}} -> %{service_id: id, revision_id: rid, sha: sha} end)
case promo do
nil -> %PipelinePromotion{stage_id: id}
%PipelinePromotion{} = promo -> promo
end
|> PipelinePromotion.changeset(add_revised(%{services: old ++ new}, diff?(stage, svcs, promo)))
|> PipelinePromotion.changeset(add_revised(%{services: stabilize_promo(old ++ new, promo)}, legacy_diff?(stage, svcs, promo)))
|> PipelinePromotion.changeset(%{context_id: stage.context_id})
|> Repo.insert_or_update()
end)
|> add_operation(:gates, fn %{stage: %{id: id}, build: promo} ->
case promo.revised do
true ->
PipelineGate.for_stage(id)
|> PipelineGate.selected()
|> Repo.update_all(set: [state: :pending, approver_id: nil])
|> elem(1)
|> send_updates()
|> ok()
_ -> {:ok, 0}
end
|> add_operation(:revised, fn %{build: promo, services: svcs, stage: stage} ->
PipelinePromotion.changeset(promo, add_revised(%{}, diff?(stage, svcs, promo)))
|> Repo.update()
end)
|> execute(extract: :build)
|> add_operation(:gates, fn
%{stage: %{id: id}, revised: %{revised: true}} ->
PipelineGate.for_stage(id)
|> PipelineGate.selected()
|> Repo.update_all(set: [state: :pending, approver_id: nil, updated_at: Timex.now()])
|> elem(1)
|> send_updates()
|> ok()
_ -> {:ok, 0}
end)
|> execute(extract: :revised)
|> notify(:create)
end

Expand Down Expand Up @@ -381,7 +389,14 @@ defmodule Console.Deployments.Pipelines do
_ -> {:ok, promo}
end
end)
|> execute(extract: :finish)
|> execute()
|> flush_context_events(fn %{resolve: resolve} ->
Enum.filter(resolve, fn
{{:ctx, _}, _} -> true
_ -> false
end)
|> Enum.map(fn {_, stage} -> stage end)
end, :finish)
end

defp entry_stages(%Pipeline{stages: stages, edges: edges}) when is_list(stages) and is_list(edges) do
Expand Down Expand Up @@ -447,11 +462,16 @@ defmodule Console.Deployments.Pipelines do
defp diff?(%PipelineStage{context_id: id}, _, %PipelinePromotion{applied_context_id: id})
when is_binary(id), do: false

defp diff?(%PipelineStage{context: %PipelineContext{inserted_at: at}}, _, %PipelinePromotion{} = next) do
Enum.all?(next.services, &Timex.after?(coalesce(&1.revision.updated_at, &1.revision.inserted_at), at))
defp diff?(%PipelineStage{context: %PipelineContext{inserted_at: at}} = promos, _, %PipelinePromotion{} = next) do
Repo.preload(next, [services: :revision], force: true)
|> Map.get(:services)
|> Enum.all?(&Timex.after?(coalesce(&1.revision.updated_at, &1.revision.inserted_at), at)) && gates_stale?(promos)
end

defp diff?(_, svcs, %PipelinePromotion{services: [_ | _]} = promo) do
defp diff?(_, _, _), do: false

defp legacy_diff?(_, [], _), do: false
defp legacy_diff?(_, svcs, %PipelinePromotion{services: [_ | _]} = promo) do
by_id = extant(promo)
Enum.any?(svcs, fn {%{sha: sha} = svc, %{id: r}} ->
case by_id[svc.id] do
Expand All @@ -461,14 +481,29 @@ defmodule Console.Deployments.Pipelines do
end
end)
end
defp legacy_diff?(_, _, _), do: true

defp diff?(_, _, _), do: true
defp gates_stale?(%PipelineStage{context: %{inserted_at: at}, from_edges: edges}) do
Enum.flat_map(edges, & &1.gates)
|> Enum.all?(&Timex.before?(coalesce(&1.updated_at, &1.inserted_at), at))
end
defp gates_stale?(_), do: true

defp send_updates(gates) do
Enum.each(gates, &handle_notify(PubSub.PipelineGateUpdated, &1))
gates
end

defp flush_context_events({:ok, result}, mapper, extract) do
case mapper.(result) do
[_ | _] = stages -> Enum.each(stages, &handle_notify(PubSub.PipelineStageUpdated, &1))
%PipelineStage{} = stage -> handle_notify(PubSub.PipelineStageUpdated, stage)
_ -> :ok
end
{:ok, result[extract]}
end
defp flush_context_events(err, _, _), do: err

defp notify({:ok, %PipelinePromotion{} = promo}, :create),
do: handle_notify(PubSub.PromotionCreated, promo)
defp notify({:ok, %PipelineStage{} = stage}, :update),
Expand Down
2 changes: 1 addition & 1 deletion lib/console/deployments/pipelines/discovery.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Console.Deployments.Pipelines.Discovery do
alias Console.Schema.{PipelinePromotion, PipelineStage}
alias Console.Deployments.Pipelines.{PromotionWorker, StageWorker}

@shards 5
@shards 10
@shard_ring HashRing.new() |> HashRing.add_nodes(Enum.to_list(0..@shards))

def promotion(%PipelinePromotion{id: id} = promo),
Expand Down
3 changes: 2 additions & 1 deletion lib/console/deployments/pipelines/promotion_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ defmodule Console.Deployments.Pipelines.PromotionWorker do
end

def dispatch(shard, %PipelinePromotion{} = promo),
do: GenServer.call(name(shard), promo)
do: GenServer.call(name(shard), promo, 10_000)

def name(shard), do: {:via, Registry, {Supervisor.registry(), {:promotion, :shard, shard}}}

def handle_call(%PipelinePromotion{} = promo, _, state) do
Logger.info "attempting to apply promotion #{promo.id}"
case Pipelines.apply_promotion(promo) do
{:ok, _} -> Logger.info "promotion #{promo.id} applied successfully"
{:error, err} -> Logger.info "failed to apply promotion #{promo.id} reason: #{inspect(err)}"
Expand Down
7 changes: 7 additions & 0 deletions lib/console/deployments/pipelines/stability.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
defmodule Console.Deployments.Pipelines.Stability do
alias Console.Deployments.{Services, Clusters}
alias Console.Schema.PipelinePromotion

def stabilize_promo(svcs, %PipelinePromotion{services: [_ | _] = old}) do
by_svcs = Map.new(old, & {&1.service_id, &1.id})
Enum.map(svcs, &Map.put(&1, :id, by_svcs[&1.service_id]))
end
def stabilize_promo(svcs, _), do: svcs

def stabilize(nil, attrs), do: Map.drop(attrs, [:edges])
def stabilize(pipe, attrs) do
Expand Down
13 changes: 8 additions & 5 deletions lib/console/deployments/pipelines/stage_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,27 @@ defmodule Console.Deployments.Pipelines.StageWorker do
do: GenServer.call(name(shard), stage)

def context(shard, %PipelineStage{} = stage),
do: GenServer.call(name(shard), {:context, stage}, timeout: 60_000)
do: GenServer.call(name(shard), {:context, stage}, 60_000)

def name(shard), do: {:via, Registry, {Supervisor.registry(), {:stage, :shard, shard}}}

def handle_call({:context, stage}, state) do
case Pipelines.apply_pipeline_context(stage) do
def handle_call({:context, stage}, _, state) do
Logger.info "starting to apply context for stage #{stage.id} (#{stage.name})"
case Pipelines.apply_pipeline_context(refetch(stage)) do
{:ok, _} ->
Logger.info "stage #{stage.id} context applied successfully"
{:reply, :ok, state}
{:error, err} ->
Logger.info "failed to apply stage context #{stage.id} reason: #{inspect(err)}"
Pipelines.add_stage_error(stage, "context", "Failed to apply stage context with error: #{format_error(err)}")
{:noreply, :error, state}
{:reply, :error, state}
end
end

def handle_call(%PipelineStage{} = stage, _, state) do
Logger.info "maybe building promotion for #{stage.id} [#{stage.name}]"
case Pipelines.build_promotion(stage) do
{:ok, _} -> Logger.info "stage #{stage.id} applied successfully"
{:ok, _} = promo -> Logger.info "stage #{stage.id} applied successfully: #{inspect(promo)}"
{:error, err} -> Logger.info "failed to apply stage #{stage.id} reason: #{inspect(err)}"
end
{:reply, :ok, state}
Expand All @@ -52,6 +53,8 @@ defmodule Console.Deployments.Pipelines.StageWorker do

def handle_info(_, state), do: {:noreply, state}

defp refetch(%PipelineStage{id: id}), do: Console.Repo.get(PipelineStage, id)

defp format_error(err) when is_binary(err), do: "\n#{err}"
defp format_error(err), do: inspect(err)
end
3 changes: 2 additions & 1 deletion lib/console/deployments/pubsub/consumers/pipeline.ex
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
defmodule Console.Deployments.PubSub.Pipeline do
use Piazza.PubSub.Consumer,
broadcaster: Console.PubSub.Broadcaster,
max_demand: 10
max_demand: 20
alias Console.Deployments.PubSub.Pipelineable
alias Console.Schema.{PipelineStage, PipelinePromotion}
alias Console.Deployments.Pipelines.Discovery

require Logger

def handle_event(event) do
case Pipelineable.pipe(event) do
Expand Down
10 changes: 7 additions & 3 deletions lib/console/deployments/pubsub/protocols/pipelineable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@ defimpl Console.Deployments.PubSub.Pipelineable, for: Console.PubSub.ServiceComp
%{stage_services: [_ | _] = ss} -> Enum.map(ss, & &1.stage)
_ -> :ok
end
else
Logger.info "Service #{svc.id} has no recent updates"
end
end
def pipe(_), do: :ok
end

defimpl Console.Deployments.PubSub.Pipelineable, for: Console.PubSub.PipelineGateApproved do
def pipe(%{item: gate}) do
defimpl Console.Deployments.PubSub.Pipelineable, for: [Console.PubSub.PipelineGateApproved, Console.PubSub.PipelineGateUpdated] do
alias Console.Schema.{PipelineGate, PipelineEdge, PipelineStage}
def pipe(%{item: %PipelineGate{state: :open} = gate}) do
case Console.Repo.preload(gate, [edge: [from: :promotion]]) do
%{edge: %{from: %{promotion: promo}}} -> promo
%PipelineGate{edge: %PipelineEdge{from: %PipelineStage{promotion: promo}}} -> promo
_ -> :ok
end
end
def pipe(_), do: :ok
end
14 changes: 6 additions & 8 deletions lib/console/deployments/pubsub/rtc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ defimpl Console.PubSub.Rtc, for: [Console.PubSub.PipelineGateUpdated, Console.Pu
end
end

# defimpl Console.PubSub.Rtc, for: [
# Console.PubSub.ServiceUpdated,
# Console.PubSub.ClusterUpdated,
# Console.PubSub.ProviderUpdated,
# Console.PubSub.GitRepositoryUpdated,
# ] do
# def deliver(%{item: item}), do: {item, :update}
# end
defimpl Console.PubSub.Rtc, for: [
Console.PubSub.ServiceUpdated,
Console.PubSub.ServiceComponentsUpdated,
] do
def deliver(%{item: item}), do: {item, :update}
end

# defimpl Console.PubSub.Rtc, for: [
# Console.PubSub.ServiceCreated,
Expand Down
1 change: 1 addition & 0 deletions lib/console/graphql/deployments.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,6 @@
object :deployment_subscriptions do
import_fields :stack_subscriptions
import_fields :pipeline_subscriptions
import_fields :service_subscriptions
end
end
11 changes: 11 additions & 0 deletions lib/console/graphql/deployments/service.ex
Original file line number Diff line number Diff line change
Expand Up @@ -618,4 +618,15 @@ defmodule Console.GraphQl.Deployments.Service do
safe_resolve &Deployments.delete_service_context/2
end
end

object :service_subscriptions do
field :service_deployment_delta, :service_deployment_delta do
arg :id, non_null(:id)

config fn args, ctx ->
with {:ok, svc} <- Deployments.resolve_service(args, ctx),
do: {:ok, topic: "services:#{svc.id}"}
end
end
end
end
6 changes: 4 additions & 2 deletions lib/console/graphql/resolvers/deployments/service.ex
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,10 @@ defmodule Console.GraphQl.Resolvers.Deployments.Service do
def kick_service(%{service_id: id}, %{context: %{current_user: user}}),
do: Services.kick(id, user)

def update_service_components(%{id: id} = args, %{context: %{cluster: cluster}}),
do: Services.update_components(Map.take(args, [:errors, :components]), id, cluster)
def update_service_components(%{id: id} = args, %{context: %{cluster: cluster}}) do
Map.take(args, ~w(errors components sha revision_id)a)
|> Services.update_components(id, cluster)
end

def merge_service(%{id: id, configuration: config}, %{context: %{current_user: user}}),
do: Services.merge_service(config, id, user)
Expand Down
4 changes: 4 additions & 0 deletions lib/console/graphql/topic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ end
defimpl Console.GraphQl.Topic, for: Console.Schema.Pipeline do
def infer(%{id: id}, _), do: [pipeline_delta: "pipelines:#{id}"]
end

defimpl Console.GraphQl.Topic, for: Console.Schema.Service do
def infer(%{id: id}, _), do: [service_deployment_delta: "services:#{id}"]
end
Loading

0 comments on commit 4315349

Please sign in to comment.