Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ccip launcher to mirror on chain state(CCIP-2687) #15065

Merged
merged 16 commits into from
Nov 4, 2024
Merged
179 changes: 59 additions & 120 deletions core/capabilities/ccip/launcher/deployment.go
Original file line number Diff line number Diff line change
@@ -1,142 +1,81 @@
package launcher

import (
"errors"
"fmt"
"sync"

cctypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"go.uber.org/multierr"

ccipreaderpkg "github.com/smartcontractkit/chainlink-ccip/pkg/reader"
cctypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types"
)

// activeCandidateDeployment represents a active-candidate deployment of OCR instances.
type activeCandidateDeployment struct {
// active is the active OCR instance.
// active must always be present.
active cctypes.CCIPOracle

// candidate is the candidate OCR instance.
// candidate may or may not be present.
// candidate must never be present if active is not present.
candidate cctypes.CCIPOracle
}

// ccipDeployment represents active-candidate deployments of both commit and exec
// OCR instances.
type ccipDeployment struct {
commit activeCandidateDeployment
exec activeCandidateDeployment
}

// Close shuts down all OCR instances in the deployment.
func (c *ccipDeployment) Close() error {
// we potentially run into this situation when
// trying to close an active instance that doesn't exist
// this check protects us from nil pointer exception

if c == nil {
return nil
}
var err error

// shutdown active commit instance.
if c.commit.active != nil {
err = multierr.Append(err, c.commit.active.Close())
}

// shutdown candidate commit instance.
if c.commit.candidate != nil {
err = multierr.Append(err, c.commit.candidate.Close())
}

// shutdown active exec instance.
if c.exec.active != nil {
err = multierr.Append(err, c.exec.active.Close())
}
// MaxPlugins is the maximum number of plugins possible.
// A plugin represents a possible combination of (active/candidate) x (commit/exec)
// If we ever have more than 4 plugins in a prev or desired state, something went wrong
const MaxPlugins = 4

// shutdown candidate exec instance.
if c.exec.candidate != nil {
err = multierr.Append(err, c.exec.candidate.Close())
}
type pluginRegistry map[ocrtypes.ConfigDigest]cctypes.CCIPOracle

return err
// StartAll will call Oracle.Start on an entire don
func (c pluginRegistry) StartAll() error {
emptyPluginRegistry := make(pluginRegistry)
return c.TransitionFrom(emptyPluginRegistry)
}

// StartActive starts the active OCR instances.
func (c *ccipDeployment) StartActive() error {
var err error

err = multierr.Append(err, c.commit.active.Start())
err = multierr.Append(err, c.exec.active.Start())

return err
// CloseAll is used to shut down an entire don immediately
func (c pluginRegistry) CloseAll() error {
emptyPluginRegistry := make(pluginRegistry)
return emptyPluginRegistry.TransitionFrom(c)
}

// CloseActive shuts down the active OCR instances.
func (c *ccipDeployment) CloseActive() error {
var err error
// TransitionFrom manages starting and stopping ocr instances
// If there are any new config digests, we need to start those instances
// If any of the previous config digests are no longer present, we need to shut those down
// We don't care about if they're exec/commit or active/candidate, that all happens in the plugin
func (c pluginRegistry) TransitionFrom(prevPlugins pluginRegistry) error {
var allErrs error

err = multierr.Append(err, c.commit.active.Close())
err = multierr.Append(err, c.exec.active.Close())

return err
}

// TransitionDeployment handles the active-candidate deployment transition.
// prevDeployment is the previous deployment state.
// there are two possible cases:
//
// 1. both active and candidate are present in prevDeployment, but only active is present in c.
// this is a promotion of candidate to active, so we need to shut down the active deployment
// and make candidate the new active. In this case candidate is already running, so there's no
// need to start it. However, we need to shut down the active deployment.
//
// 2. only active is present in prevDeployment, both active and candidate are present in c.
// In this case, active is already running, so there's no need to start it. We need to
// start candidate.
func (c *ccipDeployment) TransitionDeployment(prevDeployment *ccipDeployment) error {
if prevDeployment == nil {
return fmt.Errorf("previous deployment is nil")
if len(c) > MaxPlugins || len(prevPlugins) > MaxPlugins {
return fmt.Errorf("current pluginRegistry or prevPlugins have more than 4 instances: len(prevPlugins): %d, len(currPlugins): %d", len(prevPlugins), len(c))
}

var err error
if prevDeployment.commit.candidate != nil && c.commit.candidate == nil {
err = multierr.Append(err, prevDeployment.commit.active.Close())
} else if prevDeployment.commit.candidate == nil && c.commit.candidate != nil {
err = multierr.Append(err, c.commit.candidate.Start())
} else {
return fmt.Errorf("invalid active-candidate deployment transition")
var wg sync.WaitGroup
var mu sync.Mutex
// This shuts down instances that were present previously, but are no longer needed
for digest, oracle := range prevPlugins {
if _, ok := c[digest]; !ok {
wg.Add(1)
go func(o cctypes.CCIPOracle) {
defer wg.Done()
if err := o.Close(); err != nil {
mu.Lock()
allErrs = multierr.Append(allErrs, err)
mu.Unlock()
}
}(oracle)
}
}

if prevDeployment.exec.candidate != nil && c.exec.candidate == nil {
err = multierr.Append(err, prevDeployment.exec.active.Close())
} else if prevDeployment.exec.candidate == nil && c.exec.candidate != nil {
err = multierr.Append(err, c.exec.candidate.Start())
} else {
return fmt.Errorf("invalid active-candidate deployment transition")
wg.Wait()

// This will start the instances that were not previously present, but are in the new config
for digest, oracle := range c {
if digest == [32]byte{} {
allErrs = multierr.Append(allErrs, errors.New("cannot start a plugin with an empty config digest"))
} else if _, ok := prevPlugins[digest]; !ok {
wg.Add(1)
go func(o cctypes.CCIPOracle) {
defer wg.Done()
if err := o.Start(); err != nil {
mu.Lock()
allErrs = multierr.Append(allErrs, err)
mu.Unlock()
}
}(oracle)
}
}
wg.Wait()

return err
}

// HasCandidateInstance returns true if the deployment has a candidate instance for the
// given plugin type.
func (c *ccipDeployment) HasCandidateInstance(pluginType cctypes.PluginType) bool {
switch pluginType {
case cctypes.PluginTypeCCIPCommit:
return c.commit.candidate != nil
case cctypes.PluginTypeCCIPExec:
return c.exec.candidate != nil
default:
return false
}
}

func isNewCandidateInstance(pluginType cctypes.PluginType, ocrConfigs []ccipreaderpkg.OCR3ConfigWithMeta, prevDeployment ccipDeployment) bool {
return len(ocrConfigs) == 2 && !prevDeployment.HasCandidateInstance(pluginType)
}

func isPromotion(pluginType cctypes.PluginType, ocrConfigs []ccipreaderpkg.OCR3ConfigWithMeta, prevDeployment ccipDeployment) bool {
return len(ocrConfigs) == 1 && prevDeployment.HasCandidateInstance(pluginType)
return allErrs
}
Loading
Loading