From 43153494538663ed7038a932c8d1021d8713f79d Mon Sep 17 00:00:00 2001 From: michaeljguarino Date: Sat, 27 Jul 2024 19:34:10 -0400 Subject: [PATCH] Fix pipeline context genserver apply (#1227) --- config/prod.exs | 6 +- lib/console/deployments/pipelines.ex | 83 +++++++++++++------ .../deployments/pipelines/discovery.ex | 2 +- .../deployments/pipelines/promotion_worker.ex | 3 +- .../deployments/pipelines/stability.ex | 7 ++ .../deployments/pipelines/stage_worker.ex | 13 +-- .../deployments/pubsub/consumers/pipeline.ex | 3 +- .../pubsub/protocols/pipelineable.ex | 10 ++- lib/console/deployments/pubsub/rtc.ex | 14 ++-- lib/console/graphql/deployments.ex | 1 + lib/console/graphql/deployments/service.ex | 11 +++ .../graphql/resolvers/deployments/service.ex | 6 +- lib/console/graphql/topic.ex | 4 + .../{pipeline_test.exs => pipelines_test.exs} | 51 +++++++++++- .../deployments/pubsub/pipeline_test.exs | 2 +- 15 files changed, 166 insertions(+), 50 deletions(-) rename test/console/deployments/{pipeline_test.exs => pipelines_test.exs} (91%) diff --git a/config/prod.exs b/config/prod.exs index 08bccd9cbe..3bfc26883c 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -36,9 +36,9 @@ config :console, Console.Cron.Scheduler, {"*/10 * * * *", {Console.Deployments.Cron, :dequeue_stacks, []}}, {"*/10 * * * *", {Console.Deployments.Cron, :place_run_workers, []}}, {"*/30 * * * *", {Console.Deployments.Cron, :spawn_stack_crons, []}}, - {"*/2 * * * *", {Console.Deployments.Cron, :scan_pipeline_stages, []}}, - {"*/2 * * * *", {Console.Deployments.Cron, :scan_pending_promotions, []}}, - {"*/2 * * * *", {Console.Deployments.Cron, :scan_pending_contexts, []}}, + {"*/4 * * * *", {Console.Deployments.Cron, :scan_pipeline_stages, []}}, + {"*/4 * * * *", {Console.Deployments.Cron, :scan_pending_promotions, []}}, + {"*/4 * * * *", {Console.Deployments.Cron, :scan_pending_contexts, []}}, {"*/10 * * * *", {Console.Deployments.Init, :ensure_secret, []}}, {"0 0 1-31/2 * *", {Console.Deployments.Cron, :backfill_deprecations, []}}, {"*/10 * * * *", {Console.Deployments.Cron, :backfill_global_services, []}}, diff --git a/lib/console/deployments/pipelines.ex b/lib/console/deployments/pipelines.ex index 44b89fa201..ad5dd915d1 100644 --- a/lib/console/deployments/pipelines.ex +++ b/lib/console/deployments/pipelines.ex @@ -1,5 +1,6 @@ defmodule Console.Deployments.Pipelines do use Console.Services.Base + require Logger use Nebulex.Caching import Console.Deployments.Policies import Console.Deployments.Pipelines.Stability @@ -123,7 +124,8 @@ defmodule Console.Deployments.Pipelines do end) |> execute() end) - |> execute(extract: :ctx) + |> execute() + |> flush_context_events(fn %{link: links} -> Map.values(links) end, :ctx) |> notify(:create, user) end @@ -132,7 +134,6 @@ defmodule Console.Deployments.Pipelines do stage |> PipelineStage.changeset(%{context_id: ctx_id}) |> Repo.update() - |> notify(:update) end @doc """ @@ -145,8 +146,11 @@ defmodule Console.Deployments.Pipelines do |> Repo.update() end - def apply_pipeline_context(%PipelineStage{context_id: ctx_id, applied_context_id: ctx_id} = stage), - do: {:ok, stage} + def apply_pipeline_context(%PipelineStage{context_id: ctx_id, applied_context_id: ctx_id} = stage) do + Logger.info "ignoring applying existing context to stage #{stage.id}" + {:ok, stage} + end + def apply_pipeline_context(%PipelineStage{} = stage) do bot = %{Users.get_bot!("console") | roles: %{admin: true}} %{context: ctx} = stage = Repo.preload(stage, [:pipeline, :context, :errors, services: [:service, criteria: :pr_automation]]) @@ -260,6 +264,7 @@ defmodule Console.Deployments.Pipelines do |> PipelineGate.update_changeset(attrs) |> allow(cluster, :update) |> when_ok(:update) + |> notify(:update) end @doc """ @@ -306,7 +311,7 @@ defmodule Console.Deployments.Pipelines do def build_promotion(%PipelineStage{id: id} = stage) do start_transaction() |> add_operation(:stage, fn _ -> - case Repo.preload(stage, [:from_edges, :context, promotion: [services: :revision], services: [service: :revision]]) do + case Repo.preload(stage, [:context, from_edges: :gates, promotion: [services: :revision], services: [service: :revision]]) do %{from_edges: [_ | _]} = stage -> {:ok, stage} _ -> {:error, "this stage has no successors"} end @@ -322,29 +327,32 @@ defmodule Console.Deployments.Pipelines do old = extant(promo) |> Map.drop(Enum.map(svcs, fn {%{id: id}, _} -> id end)) |> Map.values() - |> Console.mapify() + |> Console.mapify() # prior services to promote w/o a healthy revision yet + # services w/ a new healthy revision new = Enum.map(svcs, fn {%{id: id, sha: sha}, %{id: rid}} -> %{service_id: id, revision_id: rid, sha: sha} end) case promo do nil -> %PipelinePromotion{stage_id: id} %PipelinePromotion{} = promo -> promo end - |> PipelinePromotion.changeset(add_revised(%{services: old ++ new}, diff?(stage, svcs, promo))) + |> PipelinePromotion.changeset(add_revised(%{services: stabilize_promo(old ++ new, promo)}, legacy_diff?(stage, svcs, promo))) |> PipelinePromotion.changeset(%{context_id: stage.context_id}) |> Repo.insert_or_update() end) - |> add_operation(:gates, fn %{stage: %{id: id}, build: promo} -> - case promo.revised do - true -> - PipelineGate.for_stage(id) - |> PipelineGate.selected() - |> Repo.update_all(set: [state: :pending, approver_id: nil]) - |> elem(1) - |> send_updates() - |> ok() - _ -> {:ok, 0} - end + |> add_operation(:revised, fn %{build: promo, services: svcs, stage: stage} -> + PipelinePromotion.changeset(promo, add_revised(%{}, diff?(stage, svcs, promo))) + |> Repo.update() end) - |> execute(extract: :build) + |> add_operation(:gates, fn + %{stage: %{id: id}, revised: %{revised: true}} -> + PipelineGate.for_stage(id) + |> PipelineGate.selected() + |> Repo.update_all(set: [state: :pending, approver_id: nil, updated_at: Timex.now()]) + |> elem(1) + |> send_updates() + |> ok() + _ -> {:ok, 0} + end) + |> execute(extract: :revised) |> notify(:create) end @@ -381,7 +389,14 @@ defmodule Console.Deployments.Pipelines do _ -> {:ok, promo} end end) - |> execute(extract: :finish) + |> execute() + |> flush_context_events(fn %{resolve: resolve} -> + Enum.filter(resolve, fn + {{:ctx, _}, _} -> true + _ -> false + end) + |> Enum.map(fn {_, stage} -> stage end) + end, :finish) end defp entry_stages(%Pipeline{stages: stages, edges: edges}) when is_list(stages) and is_list(edges) do @@ -447,11 +462,16 @@ defmodule Console.Deployments.Pipelines do defp diff?(%PipelineStage{context_id: id}, _, %PipelinePromotion{applied_context_id: id}) when is_binary(id), do: false - defp diff?(%PipelineStage{context: %PipelineContext{inserted_at: at}}, _, %PipelinePromotion{} = next) do - Enum.all?(next.services, &Timex.after?(coalesce(&1.revision.updated_at, &1.revision.inserted_at), at)) + defp diff?(%PipelineStage{context: %PipelineContext{inserted_at: at}} = promos, _, %PipelinePromotion{} = next) do + Repo.preload(next, [services: :revision], force: true) + |> Map.get(:services) + |> Enum.all?(&Timex.after?(coalesce(&1.revision.updated_at, &1.revision.inserted_at), at)) && gates_stale?(promos) end - defp diff?(_, svcs, %PipelinePromotion{services: [_ | _]} = promo) do + defp diff?(_, _, _), do: false + + defp legacy_diff?(_, [], _), do: false + defp legacy_diff?(_, svcs, %PipelinePromotion{services: [_ | _]} = promo) do by_id = extant(promo) Enum.any?(svcs, fn {%{sha: sha} = svc, %{id: r}} -> case by_id[svc.id] do @@ -461,14 +481,29 @@ defmodule Console.Deployments.Pipelines do end end) end + defp legacy_diff?(_, _, _), do: true - defp diff?(_, _, _), do: true + defp gates_stale?(%PipelineStage{context: %{inserted_at: at}, from_edges: edges}) do + Enum.flat_map(edges, & &1.gates) + |> Enum.all?(&Timex.before?(coalesce(&1.updated_at, &1.inserted_at), at)) + end + defp gates_stale?(_), do: true defp send_updates(gates) do Enum.each(gates, &handle_notify(PubSub.PipelineGateUpdated, &1)) gates end + defp flush_context_events({:ok, result}, mapper, extract) do + case mapper.(result) do + [_ | _] = stages -> Enum.each(stages, &handle_notify(PubSub.PipelineStageUpdated, &1)) + %PipelineStage{} = stage -> handle_notify(PubSub.PipelineStageUpdated, stage) + _ -> :ok + end + {:ok, result[extract]} + end + defp flush_context_events(err, _, _), do: err + defp notify({:ok, %PipelinePromotion{} = promo}, :create), do: handle_notify(PubSub.PromotionCreated, promo) defp notify({:ok, %PipelineStage{} = stage}, :update), diff --git a/lib/console/deployments/pipelines/discovery.ex b/lib/console/deployments/pipelines/discovery.ex index b5edcb0c6e..9f1ca71bc2 100644 --- a/lib/console/deployments/pipelines/discovery.ex +++ b/lib/console/deployments/pipelines/discovery.ex @@ -6,7 +6,7 @@ defmodule Console.Deployments.Pipelines.Discovery do alias Console.Schema.{PipelinePromotion, PipelineStage} alias Console.Deployments.Pipelines.{PromotionWorker, StageWorker} - @shards 5 + @shards 10 @shard_ring HashRing.new() |> HashRing.add_nodes(Enum.to_list(0..@shards)) def promotion(%PipelinePromotion{id: id} = promo), diff --git a/lib/console/deployments/pipelines/promotion_worker.ex b/lib/console/deployments/pipelines/promotion_worker.ex index c5af6f025c..fa7398c0e8 100644 --- a/lib/console/deployments/pipelines/promotion_worker.ex +++ b/lib/console/deployments/pipelines/promotion_worker.ex @@ -14,11 +14,12 @@ defmodule Console.Deployments.Pipelines.PromotionWorker do end def dispatch(shard, %PipelinePromotion{} = promo), - do: GenServer.call(name(shard), promo) + do: GenServer.call(name(shard), promo, 10_000) def name(shard), do: {:via, Registry, {Supervisor.registry(), {:promotion, :shard, shard}}} def handle_call(%PipelinePromotion{} = promo, _, state) do + Logger.info "attempting to apply promotion #{promo.id}" case Pipelines.apply_promotion(promo) do {:ok, _} -> Logger.info "promotion #{promo.id} applied successfully" {:error, err} -> Logger.info "failed to apply promotion #{promo.id} reason: #{inspect(err)}" diff --git a/lib/console/deployments/pipelines/stability.ex b/lib/console/deployments/pipelines/stability.ex index bbacd109a3..616f62558f 100644 --- a/lib/console/deployments/pipelines/stability.ex +++ b/lib/console/deployments/pipelines/stability.ex @@ -1,5 +1,12 @@ defmodule Console.Deployments.Pipelines.Stability do alias Console.Deployments.{Services, Clusters} + alias Console.Schema.PipelinePromotion + + def stabilize_promo(svcs, %PipelinePromotion{services: [_ | _] = old}) do + by_svcs = Map.new(old, & {&1.service_id, &1.id}) + Enum.map(svcs, &Map.put(&1, :id, by_svcs[&1.service_id])) + end + def stabilize_promo(svcs, _), do: svcs def stabilize(nil, attrs), do: Map.drop(attrs, [:edges]) def stabilize(pipe, attrs) do diff --git a/lib/console/deployments/pipelines/stage_worker.ex b/lib/console/deployments/pipelines/stage_worker.ex index 68126b2ab6..a5d3c73730 100644 --- a/lib/console/deployments/pipelines/stage_worker.ex +++ b/lib/console/deployments/pipelines/stage_worker.ex @@ -20,26 +20,27 @@ defmodule Console.Deployments.Pipelines.StageWorker do do: GenServer.call(name(shard), stage) def context(shard, %PipelineStage{} = stage), - do: GenServer.call(name(shard), {:context, stage}, timeout: 60_000) + do: GenServer.call(name(shard), {:context, stage}, 60_000) def name(shard), do: {:via, Registry, {Supervisor.registry(), {:stage, :shard, shard}}} - def handle_call({:context, stage}, state) do - case Pipelines.apply_pipeline_context(stage) do + def handle_call({:context, stage}, _, state) do + Logger.info "starting to apply context for stage #{stage.id} (#{stage.name})" + case Pipelines.apply_pipeline_context(refetch(stage)) do {:ok, _} -> Logger.info "stage #{stage.id} context applied successfully" {:reply, :ok, state} {:error, err} -> Logger.info "failed to apply stage context #{stage.id} reason: #{inspect(err)}" Pipelines.add_stage_error(stage, "context", "Failed to apply stage context with error: #{format_error(err)}") - {:noreply, :error, state} + {:reply, :error, state} end end def handle_call(%PipelineStage{} = stage, _, state) do Logger.info "maybe building promotion for #{stage.id} [#{stage.name}]" case Pipelines.build_promotion(stage) do - {:ok, _} -> Logger.info "stage #{stage.id} applied successfully" + {:ok, _} = promo -> Logger.info "stage #{stage.id} applied successfully: #{inspect(promo)}" {:error, err} -> Logger.info "failed to apply stage #{stage.id} reason: #{inspect(err)}" end {:reply, :ok, state} @@ -52,6 +53,8 @@ defmodule Console.Deployments.Pipelines.StageWorker do def handle_info(_, state), do: {:noreply, state} + defp refetch(%PipelineStage{id: id}), do: Console.Repo.get(PipelineStage, id) + defp format_error(err) when is_binary(err), do: "\n#{err}" defp format_error(err), do: inspect(err) end diff --git a/lib/console/deployments/pubsub/consumers/pipeline.ex b/lib/console/deployments/pubsub/consumers/pipeline.ex index 12585baa91..18dc3ac58c 100644 --- a/lib/console/deployments/pubsub/consumers/pipeline.ex +++ b/lib/console/deployments/pubsub/consumers/pipeline.ex @@ -1,11 +1,12 @@ defmodule Console.Deployments.PubSub.Pipeline do use Piazza.PubSub.Consumer, broadcaster: Console.PubSub.Broadcaster, - max_demand: 10 + max_demand: 20 alias Console.Deployments.PubSub.Pipelineable alias Console.Schema.{PipelineStage, PipelinePromotion} alias Console.Deployments.Pipelines.Discovery + require Logger def handle_event(event) do case Pipelineable.pipe(event) do diff --git a/lib/console/deployments/pubsub/protocols/pipelineable.ex b/lib/console/deployments/pubsub/protocols/pipelineable.ex index 8711920d7f..b0587afc8e 100644 --- a/lib/console/deployments/pubsub/protocols/pipelineable.ex +++ b/lib/console/deployments/pubsub/protocols/pipelineable.ex @@ -25,16 +25,20 @@ defimpl Console.Deployments.PubSub.Pipelineable, for: Console.PubSub.ServiceComp %{stage_services: [_ | _] = ss} -> Enum.map(ss, & &1.stage) _ -> :ok end + else + Logger.info "Service #{svc.id} has no recent updates" end end def pipe(_), do: :ok end -defimpl Console.Deployments.PubSub.Pipelineable, for: Console.PubSub.PipelineGateApproved do - def pipe(%{item: gate}) do +defimpl Console.Deployments.PubSub.Pipelineable, for: [Console.PubSub.PipelineGateApproved, Console.PubSub.PipelineGateUpdated] do + alias Console.Schema.{PipelineGate, PipelineEdge, PipelineStage} + def pipe(%{item: %PipelineGate{state: :open} = gate}) do case Console.Repo.preload(gate, [edge: [from: :promotion]]) do - %{edge: %{from: %{promotion: promo}}} -> promo + %PipelineGate{edge: %PipelineEdge{from: %PipelineStage{promotion: promo}}} -> promo _ -> :ok end end + def pipe(_), do: :ok end diff --git a/lib/console/deployments/pubsub/rtc.ex b/lib/console/deployments/pubsub/rtc.ex index 2b5437cf0b..371eba6936 100644 --- a/lib/console/deployments/pubsub/rtc.ex +++ b/lib/console/deployments/pubsub/rtc.ex @@ -20,14 +20,12 @@ defimpl Console.PubSub.Rtc, for: [Console.PubSub.PipelineGateUpdated, Console.Pu end end -# defimpl Console.PubSub.Rtc, for: [ -# Console.PubSub.ServiceUpdated, -# Console.PubSub.ClusterUpdated, -# Console.PubSub.ProviderUpdated, -# Console.PubSub.GitRepositoryUpdated, -# ] do -# def deliver(%{item: item}), do: {item, :update} -# end +defimpl Console.PubSub.Rtc, for: [ + Console.PubSub.ServiceUpdated, + Console.PubSub.ServiceComponentsUpdated, +] do + def deliver(%{item: item}), do: {item, :update} +end # defimpl Console.PubSub.Rtc, for: [ # Console.PubSub.ServiceCreated, diff --git a/lib/console/graphql/deployments.ex b/lib/console/graphql/deployments.ex index eb7a490fa9..3e447f0b42 100644 --- a/lib/console/graphql/deployments.ex +++ b/lib/console/graphql/deployments.ex @@ -104,5 +104,6 @@ object :deployment_subscriptions do import_fields :stack_subscriptions import_fields :pipeline_subscriptions + import_fields :service_subscriptions end end diff --git a/lib/console/graphql/deployments/service.ex b/lib/console/graphql/deployments/service.ex index 98f72f87fd..6592fda2bd 100644 --- a/lib/console/graphql/deployments/service.ex +++ b/lib/console/graphql/deployments/service.ex @@ -618,4 +618,15 @@ defmodule Console.GraphQl.Deployments.Service do safe_resolve &Deployments.delete_service_context/2 end end + + object :service_subscriptions do + field :service_deployment_delta, :service_deployment_delta do + arg :id, non_null(:id) + + config fn args, ctx -> + with {:ok, svc} <- Deployments.resolve_service(args, ctx), + do: {:ok, topic: "services:#{svc.id}"} + end + end + end end diff --git a/lib/console/graphql/resolvers/deployments/service.ex b/lib/console/graphql/resolvers/deployments/service.ex index cca08ea40d..fab9cde9c6 100644 --- a/lib/console/graphql/resolvers/deployments/service.ex +++ b/lib/console/graphql/resolvers/deployments/service.ex @@ -165,8 +165,10 @@ defmodule Console.GraphQl.Resolvers.Deployments.Service do def kick_service(%{service_id: id}, %{context: %{current_user: user}}), do: Services.kick(id, user) - def update_service_components(%{id: id} = args, %{context: %{cluster: cluster}}), - do: Services.update_components(Map.take(args, [:errors, :components]), id, cluster) + def update_service_components(%{id: id} = args, %{context: %{cluster: cluster}}) do + Map.take(args, ~w(errors components sha revision_id)a) + |> Services.update_components(id, cluster) + end def merge_service(%{id: id, configuration: config}, %{context: %{current_user: user}}), do: Services.merge_service(config, id, user) diff --git a/lib/console/graphql/topic.ex b/lib/console/graphql/topic.ex index 3ea5bf312e..a5489237f1 100644 --- a/lib/console/graphql/topic.ex +++ b/lib/console/graphql/topic.ex @@ -35,3 +35,7 @@ end defimpl Console.GraphQl.Topic, for: Console.Schema.Pipeline do def infer(%{id: id}, _), do: [pipeline_delta: "pipelines:#{id}"] end + +defimpl Console.GraphQl.Topic, for: Console.Schema.Service do + def infer(%{id: id}, _), do: [service_deployment_delta: "services:#{id}"] +end diff --git a/test/console/deployments/pipeline_test.exs b/test/console/deployments/pipelines_test.exs similarity index 91% rename from test/console/deployments/pipeline_test.exs rename to test/console/deployments/pipelines_test.exs index f86530ece0..f9bf23f8f4 100644 --- a/test/console/deployments/pipeline_test.exs +++ b/test/console/deployments/pipelines_test.exs @@ -238,13 +238,15 @@ defmodule Console.Deployments.PipelinesTest do expect(Console.Deployments.Pr.Dispatcher, :create, fn _, _, %{"some" => "context"} -> {:ok, %{title: "some", url: "url"}} end) - {:ok, %{stg: stage}} = Pipelines.apply_pipeline_context(dev) + {:ok, %{stg: %{id: id} = stage}} = Pipelines.apply_pipeline_context(dev) assert stage.applied_context_id == ctx.id pipe_pr = Console.Repo.get_by(Console.Schema.PipelinePullRequest, context_id: ctx.id, service_id: svc.id) %{pull_request: pr} = Console.Repo.preload(pipe_pr, [:pull_request]) assert pr.service_id == svc.id + + assert_receive {:event, %PubSub.PipelineStageUpdated{item: %{id: ^id}}} end end @@ -486,6 +488,8 @@ defmodule Console.Deployments.PipelinesTest do assert updated.id == job.id assert updated.cluster_id == cluster.id assert updated.state == :open + + assert_receive {:event, %PubSub.PipelineGateUpdated{item: ^updated}} end test "agents cannot update gates for other clusters" do @@ -558,6 +562,51 @@ defmodule Console.Deployments.PipelinesTest do assert secrets["other"] == "other-value" end + test "it can apply a pr based promotion to all adjacent pipeline stages" do + admin = admin_user() + cluster = insert(:cluster) + repository = insert(:git_repository) + + pipe = insert(:pipeline) + ctx = insert(:pipeline_context, pipeline: pipe) + dev = insert(:pipeline_stage, pipeline: pipe, context: ctx) + %{id: prd_id} = prod = insert(:pipeline_stage, pipeline: pipe) + edge = insert(:pipeline_edge, pipeline: pipe, from: dev, to: prod) + + {:ok, dev_svc} = create_service(cluster, admin, [ + name: "dev", + namespace: "test", + git: %{ref: "master", folder: "k8s"}, + sha: "test-sha", + repository_id: repository.id, + configuration: [%{name: "name", value: "new-value"}] + ]) + dev_svc = Console.Repo.preload(dev_svc, [:revision]) + + {:ok, prod_svc} = create_service(cluster, admin, [ + name: "prod", + namespace: "test", + git: %{ref: "master", folder: "k8s"}, + repository_id: repository.id, + configuration: [%{name: "name", value: "value"}, %{name: "other", value: "other-value"}] + ]) + + insert(:stage_service, stage: dev, service: dev_svc) + insert(:stage_service, stage: prod, service: prod_svc) + promo = insert(:pipeline_promotion, stage: dev, revised_at: Timex.now(), context: ctx) + insert(:promotion_service, promotion: promo, service: dev_svc, revision: dev_svc.revision) + + {:ok, promod} = Pipelines.apply_promotion(promo) + + assert promod.id == promo.id + assert promod.promoted_at + assert refetch(edge).promoted_at + + assert refetch(prod).context_id == ctx.id + + assert_receive {:event, %PubSub.PipelineStageUpdated{item: %{id: ^prd_id}}} + end + test "it will block promotion if a gate is not open" do admin = admin_user() cluster = insert(:cluster) diff --git a/test/console/deployments/pubsub/pipeline_test.exs b/test/console/deployments/pubsub/pipeline_test.exs index 23b5980e16..c1b3a20299 100644 --- a/test/console/deployments/pubsub/pipeline_test.exs +++ b/test/console/deployments/pubsub/pipeline_test.exs @@ -34,7 +34,7 @@ defmodule Console.Deployments.PubSub.PipelineTest do describe "PipelineGateApproved" do test "it will attempt to apply a promotion on approval" do - gate = insert(:pipeline_gate) + gate = insert(:pipeline_gate, state: :open) promo = insert(:pipeline_promotion, stage: gate.edge.from) expect(Console.Deployments.Pipelines.Discovery, :promotion, & {:ok, &1})