Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[readState] tolerant short delay when read from contract staking indexer #4458

Merged
merged 7 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions action/protocol/staking/contractstake_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package staking
import (
_ "embed"
"strings"
"time"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/iotexproject/iotex-address/address"
Expand All @@ -25,6 +26,7 @@ type (

// ContractStakingIndexer defines the interface of contract staking reader
ContractStakingIndexer interface {
Height() (uint64, error)
// Buckets returns active buckets
Buckets(height uint64) ([]*VoteBucket, error)
// BucketsByIndices returns active buckets by indices
Expand All @@ -42,6 +44,17 @@ type (
// BucketTypes returns the active bucket types
BucketTypes(height uint64) ([]*ContractStakingBucketType, error)
}

delayTolerantIndexer struct {
ContractStakingIndexer
duration time.Duration
startHeight uint64
}

delayTolerantIndexerWithBucketType struct {
*delayTolerantIndexer
envestcc marked this conversation as resolved.
Show resolved Hide resolved
indexer ContractStakingIndexerWithBucketType
}
)

func init() {
Expand All @@ -51,3 +64,93 @@ func init() {
panic(err)
}
}

// NewDelayTolerantIndexer creates a delay tolerant indexer
func NewDelayTolerantIndexer(indexer ContractStakingIndexer, duration time.Duration) ContractStakingIndexer {
d := &delayTolerantIndexer{ContractStakingIndexer: indexer, duration: duration}
if indexWithStart, ok := indexer.(interface{ StartHeight() uint64 }); ok {
d.startHeight = indexWithStart.StartHeight()
}
return d
}

// NewDelayTolerantIndexerWithBucketType creates a delay tolerant indexer with bucket type
func NewDelayTolerantIndexerWithBucketType(indexer ContractStakingIndexerWithBucketType, duration time.Duration) ContractStakingIndexerWithBucketType {
return &delayTolerantIndexerWithBucketType{
NewDelayTolerantIndexer(indexer, duration).(*delayTolerantIndexer),
indexer,
}
}

func (c *delayTolerantIndexer) wait(height uint64) (bool, error) {
// first check if the height is already reached
if c.startHeight >= height {
return false, nil
}
h, err := c.Height()
if err != nil {
return false, err
}
if h >= height {
return true, nil
}
// wait for the height to be reached
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
timer := time.NewTimer(c.duration)
defer timer.Stop()
for {
select {
case <-ticker.C:
h, err := c.Height()
if err != nil {
return false, err
}
if h >= height {
return true, nil
}
case <-timer.C:
return false, nil
}
}
}

func (c *delayTolerantIndexer) Buckets(height uint64) ([]*VoteBucket, error) {
_, err := c.wait(height)
envestcc marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
return c.ContractStakingIndexer.Buckets(height)
envestcc marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *delayTolerantIndexer) BucketsByIndices(indices []uint64, height uint64) ([]*VoteBucket, error) {
_, err := c.wait(height)
if err != nil {
return nil, err
}
return c.ContractStakingIndexer.BucketsByIndices(indices, height)
}

func (c *delayTolerantIndexer) BucketsByCandidate(ownerAddr address.Address, height uint64) ([]*VoteBucket, error) {
_, err := c.wait(height)
if err != nil {
return nil, err
}
return c.ContractStakingIndexer.BucketsByCandidate(ownerAddr, height)
}

func (c *delayTolerantIndexer) TotalBucketCount(height uint64) (uint64, error) {
_, err := c.wait(height)
if err != nil {
return 0, err
}
return c.ContractStakingIndexer.TotalBucketCount(height)
}

func (c *delayTolerantIndexerWithBucketType) BucketTypes(height uint64) ([]*ContractStakingBucketType, error) {
_, err := c.wait(height)
if err != nil {
return nil, err
}
return c.indexer.BucketTypes(height)
}
30 changes: 30 additions & 0 deletions action/protocol/staking/contractstake_indexer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

100 changes: 100 additions & 0 deletions action/protocol/staking/contractstake_indexer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package staking

import (
"errors"
"math/big"
"sync/atomic"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/iotexproject/iotex-address/address"
"github.com/stretchr/testify/require"
)

func TestDelayTolerantIndexer(t *testing.T) {
r := require.New(t)
ctrl := gomock.NewController(t)
indexer := NewMockContractStakingIndexerWithBucketType(ctrl)
delayIndexer := NewDelayTolerantIndexerWithBucketType(indexer, time.Second)

var (
indexerHeight = uint64(10)
indexerBuckets = []*VoteBucket{{Index: 1}}
indexerAddress = address.ZeroAddress
indexerBucketTypes = []*ContractStakingBucketType{{Amount: big.NewInt(1)}}
)

// Height
indexer.EXPECT().Height().DoAndReturn(func() (uint64, error) {
return atomic.LoadUint64(&indexerHeight), nil
}).AnyTimes()

height, err := delayIndexer.Height()
r.NoError(err)
r.Equal(atomic.LoadUint64(&indexerHeight), height)
// Buckets
indexer.EXPECT().Buckets(gomock.Any()).DoAndReturn(func(height uint64) ([]*VoteBucket, error) {
if height <= atomic.LoadUint64(&indexerHeight) {
return indexerBuckets, nil
}
return nil, errors.New("invalid height")
}).AnyTimes()
noDelayHeight, delayHeight := atomic.LoadUint64(&indexerHeight), atomic.LoadUint64(&indexerHeight)+1
bkts, err := delayIndexer.Buckets(noDelayHeight)
r.NoError(err)
r.Equal(indexerBuckets, bkts)
bkts, err = delayIndexer.Buckets(delayHeight)
r.ErrorContains(err, "invalid height")
go func() {
time.Sleep(100 * time.Millisecond)
atomic.StoreUint64(&indexerHeight, delayHeight)
}()
bkts, err = delayIndexer.Buckets(delayHeight)
r.NoError(err)
r.Equal(indexerBuckets, bkts)
// BucketsByIndices
indexer.EXPECT().BucketsByIndices(gomock.Any(), gomock.Any()).DoAndReturn(func(indices []uint64, height uint64) ([]*VoteBucket, error) {
if height <= atomic.LoadUint64(&indexerHeight) {
return indexerBuckets, nil
}
return nil, errors.New("invalid height")
}).AnyTimes()
bkts, err = delayIndexer.BucketsByIndices(nil, delayHeight)
r.NoError(err)
r.Equal(indexerBuckets, bkts)
// BucketsByCandidate
indexer.EXPECT().BucketsByCandidate(gomock.Any(), gomock.Any()).DoAndReturn(func(ownerAddr address.Address, height uint64) ([]*VoteBucket, error) {
if height <= atomic.LoadUint64(&indexerHeight) {
return indexerBuckets, nil
}
return nil, errors.New("invalid height")
}).AnyTimes()
bkts, err = delayIndexer.BucketsByCandidate(nil, delayHeight)
r.NoError(err)
r.Equal(indexerBuckets, bkts)
// TotalBucketCount
indexer.EXPECT().TotalBucketCount(gomock.Any()).DoAndReturn(func(height uint64) (uint64, error) {
if height <= atomic.LoadUint64(&indexerHeight) {
return uint64(len(indexerBuckets)), nil
}
return 0, errors.New("invalid height")
}).AnyTimes()
count, err := delayIndexer.TotalBucketCount(delayHeight)
r.NoError(err)
r.Equal(uint64(len(indexerBuckets)), count)
// ContractAddress
indexer.EXPECT().ContractAddress().Return(indexerAddress).AnyTimes()
ca := delayIndexer.ContractAddress()
r.Equal(indexerAddress, ca)
// BucketTypes
indexer.EXPECT().BucketTypes(gomock.Any()).DoAndReturn(func(height uint64) ([]*ContractStakingBucketType, error) {
if height <= atomic.LoadUint64(&indexerHeight) {
return indexerBucketTypes, nil
}
return nil, errors.New("invalid height")
})
bucketTypes, err := delayIndexer.BucketTypes(delayHeight)
r.NoError(err)
r.Equal(indexerBucketTypes, bucketTypes)
}
4 changes: 2 additions & 2 deletions action/protocol/staking/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,10 +581,10 @@ func (p *Protocol) ReadState(ctx context.Context, sr protocol.StateReader, metho
// stakeSR is the stake state reader including native and contract staking
indexers := []ContractStakingIndexer{}
if p.contractStakingIndexer != nil {
indexers = append(indexers, p.contractStakingIndexer)
indexers = append(indexers, NewDelayTolerantIndexerWithBucketType(p.contractStakingIndexer, time.Second))
}
if p.contractStakingIndexerV2 != nil {
indexers = append(indexers, p.contractStakingIndexerV2)
indexers = append(indexers, NewDelayTolerantIndexer(p.contractStakingIndexerV2, time.Second))
}
stakeSR, err := newCompositeStakingStateReader(p.candBucketsIndexer, sr, p.calculateVoteWeight, indexers...)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions action/protocol/staking/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ func TestProtocol_ActiveCandidates(t *testing.T) {
require.NoError(err)

var csIndexerHeight, csVotes uint64
csIndexer.EXPECT().Height().Return(uint64(0), nil).AnyTimes()
csIndexer.EXPECT().BucketsByCandidate(gomock.Any(), gomock.Any()).DoAndReturn(func(ownerAddr address.Address, height uint64) ([]*VoteBucket, error) {
if height != csIndexerHeight {
return nil, errors.Errorf("invalid height %d", height)
Expand Down
Loading