Skip to content

Commit

Permalink
try with resubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
AnieeG committed Apr 3, 2024
1 parent d946f13 commit 1cecde4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 28 deletions.
39 changes: 13 additions & 26 deletions integration-tests/ccip-tests/actions/ccip_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"math/big"
"net/http"
"runtime"
Expand Down Expand Up @@ -450,14 +449,12 @@ func (ccipModule *CCIPCommon) WaitForPriceUpdates(

func (ccipModule *CCIPCommon) WatchForPriceUpdates(ctx context.Context) error {
gasUpdateEvent := make(chan *price_registry.PriceRegistryUsdPerUnitGasUpdated)
sub, err := ccipModule.PriceRegistry.Instance.WatchUsdPerUnitGasUpdated(nil, gasUpdateEvent, nil)
if err != nil || sub == nil {
return err
}
sub := event.Resubscribe(2*time.Hour, func(ctx context.Context) (event.Subscription, error) {
return ccipModule.PriceRegistry.Instance.WatchUsdPerUnitGasUpdated(nil, gasUpdateEvent, nil)
})

go func() {
defer sub.Unsubscribe()
backoff := 5 * time.Second
for {
select {
case e := <-gasUpdateEvent:
Expand All @@ -475,18 +472,6 @@ func (ccipModule *CCIPCommon) WatchForPriceUpdates(ctx context.Context) error {
Str("price_registry", ccipModule.PriceRegistry.Address()).
Msgf("UsdPerUnitGasUpdated event received for dest chain %d source chain %s",
destChain, ccipModule.ChainClient.GetNetworkName())
case err := <-sub.Err():
if err != nil {
log.Warn().
Err(err).
Str("Backoff", backoff.String()).
Msg("error on UsdPerUnitGasUpdated subscription, attempting to resubscribe")
sub, err = ccipModule.PriceRegistry.Instance.WatchUsdPerUnitGasUpdated(nil, gasUpdateEvent, nil)
if err != nil {
time.Sleep(backoff)
backoff = time.Duration(math.Min(float64(backoff)*2, float64(30*time.Second)))
}
}
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -1806,10 +1791,11 @@ func (destCCIP *DestCCIPModule) AssertNoReportAcceptedEventReceived(lggr zerolog
ctx, cancel := context.WithTimeout(context.Background(), timeRange)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
eventFoundAfterCursing := false
var eventFoundAfterCursing *time.Time
// verify if CommitReportAccepted is received, it's not generated after provided lastSeenTimestamp
destCCIP.ReportAcceptedWatcher.Range(func(key, value any) bool {
e, exists := value.(*evm_2_evm_offramp.EVM2EVMOffRampExecutionStateChanged)
Expand All @@ -1820,14 +1806,14 @@ func (destCCIP *DestCCIPModule) AssertNoReportAcceptedEventReceived(lggr zerolog
return true
}
if hdr.Timestamp.After(lastSeenTimestamp) {
eventFoundAfterCursing = true
eventFoundAfterCursing = pointer.ToTime(hdr.Timestamp)
return false
}
}
return true
})
if eventFoundAfterCursing {
return fmt.Errorf("CommitReportAccepted Event detected after %s", lastSeenTimestamp)
if eventFoundAfterCursing != nil {
return fmt.Errorf("CommitReportAccepted Event detected at %s after %s", lastSeenTimestamp, eventFoundAfterCursing.String())
}
case <-ctx.Done():
lggr.Info().Msgf("successfully validated that no CommitReportAccepted detected after %s for %s", lastSeenTimestamp, timeRange)
Expand All @@ -1841,10 +1827,11 @@ func (destCCIP *DestCCIPModule) AssertNoExecutionStateChangedEventReceived(lggr
ctx, cancel := context.WithTimeout(context.Background(), timeRange)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
eventFoundAfterCursing := false
var eventFoundAfterCursing *time.Time
// verify if executionstate changed is received, it's not generated after provided lastSeenTimestamp
destCCIP.ExecStateChangedWatcher.Range(func(key, value any) bool {
e, exists := value.(*evm_2_evm_offramp.EVM2EVMOffRampExecutionStateChanged)
Expand All @@ -1855,14 +1842,14 @@ func (destCCIP *DestCCIPModule) AssertNoExecutionStateChangedEventReceived(lggr
return true
}
if hdr.Timestamp.After(lastSeenTimestamp) {
eventFoundAfterCursing = true
eventFoundAfterCursing = pointer.ToTime(hdr.Timestamp)
return false
}
}
return true
})
if eventFoundAfterCursing {
return fmt.Errorf("ExecutionStateChanged Event detected after %s", lastSeenTimestamp)
if eventFoundAfterCursing != nil {
return fmt.Errorf("ExecutionStateChanged Event detected at %s after %s", lastSeenTimestamp, eventFoundAfterCursing.String())
}
case <-ctx.Done():
lggr.Info().Msgf("successfully validated that no ExecutionStateChanged detected after %s for %s", lastSeenTimestamp, timeRange)
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/ccip-tests/load/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ func (l *LoadArgs) ValidateCurseFollowedByUncurse() {
errGrp.Go(func() error {
lane.Logger.Info().Msg("Validating no CommitReportAccepted event is received for 29 minutes")
// we allow additional 1 minute after curse timestamp for curse to be visible by plugin
return lane.Dest.AssertNoReportAcceptedEventReceived(lane.Logger, 29*time.Minute, curseTimeStamp.Add(1*time.Minute))
return lane.Dest.AssertNoReportAcceptedEventReceived(lane.Logger, 25*time.Minute, curseTimeStamp.Add(1*time.Minute))
})
errGrp.Go(func() error {
lane.Logger.Info().Msg("Validating no ExecutionStateChanged event is received for 29 minutes")
// we allow additional 1 minute after curse timestamp for curse to be visible by plugin
return lane.Dest.AssertNoExecutionStateChangedEventReceived(lane.Logger, 29*time.Minute, curseTimeStamp.Add(1*time.Minute))
return lane.Dest.AssertNoExecutionStateChangedEventReceived(lane.Logger, 25*time.Minute, curseTimeStamp.Add(1*time.Minute))
})
}
l.lggr.Info().Msg("waiting for no commit/execution validation")
Expand Down

0 comments on commit 1cecde4

Please sign in to comment.