Skip to content

Commit

Permalink
use a map to get difference between oracles to start and stop (#15118)
Browse files Browse the repository at this point in the history
* use a map to get difference between oracles to start and stop

* lint

* size the slice dynamically

* use error group to handle mutex and waitgroup

* remove chained iota

Co-authored-by: Makram <[email protected]>

---------

Co-authored-by: Makram <[email protected]>
  • Loading branch information
0xAustinWang and makramkd authored Nov 5, 2024
1 parent bf72fe3 commit 97abe0f
Showing 1 changed file with 44 additions and 38 deletions.
82 changes: 44 additions & 38 deletions core/capabilities/ccip/launcher/deployment.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package launcher

import (
"errors"
"fmt"
"sync"

mapset "github.com/deckarep/golang-set/v2"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"go.uber.org/multierr"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

cctypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types"
)
Expand Down Expand Up @@ -35,47 +34,54 @@ func (c pluginRegistry) CloseAll() error {
// If any of the previous config digests are no longer present, we need to shut those down
// We don't care about if they're exec/commit or active/candidate, that all happens in the plugin
func (c pluginRegistry) TransitionFrom(prevPlugins pluginRegistry) error {
var allErrs error

if len(c) > MaxPlugins || len(prevPlugins) > MaxPlugins {
return fmt.Errorf("current pluginRegistry or prevPlugins have more than 4 instances: len(prevPlugins): %d, len(currPlugins): %d", len(prevPlugins), len(c))
}

var wg sync.WaitGroup
var mu sync.Mutex
// This shuts down instances that were present previously, but are no longer needed
for digest, oracle := range prevPlugins {
if _, ok := c[digest]; !ok {
wg.Add(1)
go func(o cctypes.CCIPOracle) {
defer wg.Done()
if err := o.Close(); err != nil {
mu.Lock()
allErrs = multierr.Append(allErrs, err)
mu.Unlock()
}
}(oracle)
}
prevOracles := mapset.NewSet[ocrtypes.ConfigDigest](maps.Keys(prevPlugins)...)
currOracles := mapset.NewSet[ocrtypes.ConfigDigest](maps.Keys(c)...)

var ops = make([]syncAction, 0, 2*MaxPlugins)
for digest := range prevOracles.Difference(currOracles).Iterator().C {
ops = append(ops, syncAction{
command: closeAction,
oracle: prevPlugins[digest],
})
}

for digest := range currOracles.Difference(prevOracles).Iterator().C {
ops = append(ops, syncAction{
command: openAction,
oracle: c[digest],
})
}
wg.Wait()

// This will start the instances that were not previously present, but are in the new config
for digest, oracle := range c {
if digest == [32]byte{} {
allErrs = multierr.Append(allErrs, errors.New("cannot start a plugin with an empty config digest"))
} else if _, ok := prevPlugins[digest]; !ok {
wg.Add(1)
go func(o cctypes.CCIPOracle) {
defer wg.Done()
if err := o.Start(); err != nil {
mu.Lock()
allErrs = multierr.Append(allErrs, err)
mu.Unlock()
g := new(errgroup.Group)
for _, op := range ops {
op := op
g.Go(func() error {
if op.command == closeAction {
if err := op.oracle.Close(); err != nil {
return err
}
}(oracle)
}
} else if op.command == openAction {
if err := op.oracle.Start(); err != nil {
return err
}
}
return nil
})
}
wg.Wait()

return allErrs
return g.Wait()
}

const (
closeAction = iota
openAction
)

type syncAction struct {
command int
oracle cctypes.CCIPOracle
}

0 comments on commit 97abe0f

Please sign in to comment.