Skip to content

Commit

Permalink
solve automation load test registration failures
Browse files Browse the repository at this point in the history
  • Loading branch information
Tofel committed Apr 18, 2024
1 parent 1bb40d9 commit e429fb9
Showing 1 changed file with 96 additions and 60 deletions.
156 changes: 96 additions & 60 deletions integration-tests/actions/automationv2/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,17 +582,12 @@ func calculateOCR3ConfigArgs(a *AutomationTest, S []int, oracleIdentities []conf
}

func (a *AutomationTest) RegisterUpkeeps(upkeepConfigs []UpkeepConfig) ([]common.Hash, error) {
var registrarABI *abi.ABI
var err error
var registrationRequest []byte
registrationTxHashes := make([]common.Hash, 0)
concurrency := a.GetConcurrency()

type result struct {
txHash common.Hash
err error
config *UpkeepConfig
clientNum *int
txHash common.Hash
err error
}

resultCh := make(chan result, concurrency)
Expand All @@ -601,7 +596,6 @@ func (a *AutomationTest) RegisterUpkeeps(upkeepConfigs []UpkeepConfig) ([]common
wgProcesses.Add(len(upkeepConfigs))

failedRegistrations := make([]result, 0)
successfulRegistrations := make(map[int]int, 0)

// process results of each upkeep registration attempt
go func() {
Expand All @@ -612,17 +606,15 @@ func (a *AutomationTest) RegisterUpkeeps(upkeepConfigs []UpkeepConfig) ([]common
} else {
a.Logger.Trace().Msg("Registered upkeep")
registrationTxHashes = append(registrationTxHashes, r.txHash)
if _, ok := successfulRegistrations[*r.clientNum]; !ok {
successfulRegistrations[*r.clientNum] = 1
} else {
successfulRegistrations[*r.clientNum]++
}
}
wgProcesses.Done()
}
}()

var registerUpkeep = func(upkeepConfig UpkeepConfig, keyNum int, resultCh chan result) {
var registrationRequest []byte
var registrarABI *abi.ABI
var err error
switch a.RegistrySettings.RegistryVersion {
case ethereum.RegistryVersion_2_0:
registrarABI, err = keeper_registrar_wrapper2_0.KeeperRegistrarMetaData.GetAbi()
Expand Down Expand Up @@ -674,11 +666,11 @@ func (a *AutomationTest) RegisterUpkeeps(upkeepConfigs []UpkeepConfig) ([]common

tx, err := a.LinkToken.TransferAndCallFromKey(a.Registrar.Address(), upkeepConfig.FundingAmount, registrationRequest, keyNum)
if err != nil {
resultCh <- result{err: errors.Join(err, fmt.Errorf("client number %d failed to register upkeep %s", keyNum, upkeepConfig.UpkeepContract.Hex())), clientNum: &keyNum, config: &upkeepConfig}
resultCh <- result{err: errors.Join(err, fmt.Errorf("client number %d failed to register upkeep %s", keyNum, upkeepConfig.UpkeepContract.Hex()))}
return
}

resultCh <- result{txHash: tx.Hash(), clientNum: &keyNum}
resultCh <- result{txHash: tx.Hash()}
}

dividedConfigs := test_utils.DivideSlice[UpkeepConfig](upkeepConfigs, concurrency)
Expand Down Expand Up @@ -713,63 +705,55 @@ func (a *AutomationTest) RegisterUpkeeps(upkeepConfigs []UpkeepConfig) ([]common
wgProcesses.Wait()
close(resultCh)

// due to reasons unknown with high concurrency, some upkeeps fail to register
// but when retried, they succeed; this might indicate a problem with Registrar contract
// as the call that fails is from link token contract to registrar contract
if len(failedRegistrations) > 0 {
failedByClient := make(map[int]int)
for _, failed := range failedRegistrations {
failedByClient[*failed.clientNum]++
}
return nil, fmt.Errorf("failed registrations: %d | successful registrations: %d", len(failedRegistrations), len(registrationTxHashes))
}

for clientNum, failed := range failedByClient {
successful := 0
if v, ok := successfulRegistrations[clientNum]; ok {
successful = v
}
if len(registrationTxHashes) != len(upkeepConfigs) {
return nil, fmt.Errorf("failed to register all upkeeps. Expected %d, got %d", len(upkeepConfigs), len(registrationTxHashes))
}

a.Logger.Error().
Int("Client Number", clientNum).
Str("Client address", a.ChainClient.Addresses[clientNum].Hex()).
Int("Failed Registrations", failed).
Int("Successful Registrations", successful).
Msg("Failed to register upkeeps")
}
a.Logger.Info().Msg("Successfully registered all upkeeps")

resultCh = make(chan result, len(failedRegistrations))
for _, failed := range failedRegistrations {
registerUpkeep(*failed.config, *failed.clientNum, resultCh)
}
close(resultCh)
return registrationTxHashes, nil
}

func (a *AutomationTest) ConfirmUpkeepsRegistered(registrationTxHashes []common.Hash) ([]*big.Int, error) {
upkeepIds := make([]*big.Int, 0)

concurrency := a.GetConcurrency()

type result struct {
upkeepID *big.Int
err error
}

resultCh := make(chan result, concurrency)
failedConfirmation := []result{}

var wgProcesses sync.WaitGroup
wgProcesses.Add(len(registrationTxHashes))

retryErrs := make([]error, 0)
go func() {
for r := range resultCh {
if r.err != nil {
retryErrs = append(retryErrs, r.err)
a.Logger.Error().Err(r.err).Msg("Failed to confirm registered upkeep")
failedConfirmation = append(failedConfirmation, r)
} else {
registrationTxHashes = append(registrationTxHashes, r.txHash)
a.Logger.Trace().Msg("Registered upkeep")
upkeepIds = append(upkeepIds, r.upkeepID)
}
wgProcesses.Done()
}
}()

if len(retryErrs) > 0 {
return nil, fmt.Errorf("failed registrations: %d | successful registrations: %d | failed registrations retries: %d | successful registrations retries: %d", len(failedRegistrations), len(registrationTxHashes), len(retryErrs), len(failedRegistrations)-len(retryErrs))
}

a.Logger.Warn().
Int("Failed Registrations", len(failedRegistrations)).
Msg("Managed to register failed upkeeps")
}

return registrationTxHashes, nil
}

func (a *AutomationTest) ConfirmUpkeepsRegistered(registrationTxHashes []common.Hash) ([]*big.Int, error) {
upkeepIds := make([]*big.Int, 0)
for _, txHash := range registrationTxHashes {
var confirmUpkeep = func(txHash common.Hash, keyNum int, resultCh chan result) {
receipt, err := a.ChainClient.Client.TransactionReceipt(context.Background(), txHash)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("failed to confirm upkeep registration"))
resultCh <- result{err: errors.Join(err, fmt.Errorf("failed to confirm upkeep registration"))}
return
}

var upkeepId *big.Int
for _, rawLog := range receipt.Logs {
parsedUpkeepId, err := a.Registry.ParseUpkeepIdFromRegisteredLog(rawLog)
Expand All @@ -779,11 +763,63 @@ func (a *AutomationTest) ConfirmUpkeepsRegistered(registrationTxHashes []common.
}
}
if upkeepId == nil {
return nil, fmt.Errorf("failed to parse upkeep id from registration receipt")
resultCh <- result{err: fmt.Errorf("failed to parse upkeep id from registration receipt")}
return
}
resultCh <- result{upkeepID: upkeepId}
}

dividedHashes := test_utils.DivideSlice[common.Hash](registrationTxHashes, concurrency)

for clientNum := 1; clientNum <= concurrency; clientNum++ {
go func(key int) {
hashes := dividedHashes[key-1]

if len(hashes) == 0 {
return
}

a.Logger.Debug().
Int("Key Number", key).
Int("Number of Configs", len(hashes)).
Msg("Started to confirm upkeeps")

for i := 0; i < len(hashes); i++ {
confirmUpkeep(hashes[i], key, resultCh)
a.Logger.Trace().
Int("Key Number", key).
Str("Done/Total", fmt.Sprintf("%d/%d", (i+1), len(hashes))).
Msg("Confirmed upkeep")
}

a.Logger.Debug().
Int("Key Number", key).
Msg("Finished to confirm upkeeps")
}(clientNum)
}

wgProcesses.Wait()
close(resultCh)

if len(failedConfirmation) > 0 {
return nil, fmt.Errorf("failed confirmations: %d | successful confirmations: %d", len(failedConfirmation), len(upkeepIds))
}

if len(registrationTxHashes) != len(upkeepIds) {
return nil, fmt.Errorf("failed to confirm all upkeeps. Expected %d, got %d", len(registrationTxHashes), len(upkeepIds))
}

seen := make(map[*big.Int]bool)
for _, upkeepId := range upkeepIds {
if seen[upkeepId] {
return nil, fmt.Errorf("duplicate upkeep id: %s. Something went wrong during upkeep confirmation. Please check the test code", upkeepId.String())
}
upkeepIds = append(upkeepIds, upkeepId)
seen[upkeepId] = true
}

a.Logger.Info().Msg("Successfully confirmed all upkeeps")
a.UpkeepIDs = upkeepIds

return upkeepIds, nil
}

Expand Down

0 comments on commit e429fb9

Please sign in to comment.