Skip to content

Commit

Permalink
Handle duplicated indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
Agusx1211 committed Feb 6, 2024
1 parent e328b56 commit 95d65fe
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 20 deletions.
35 changes: 21 additions & 14 deletions compressor/loadstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,41 +61,48 @@ func ParseBatchResult(to map[string]uint, res []byte, offset uint) error {
return nil
}

func LoadState(ctx context.Context, provider *ethrpc.Provider, contract common.Address, batchSize uint, skipa uint, skipb uint, skipBlocks uint) (map[string]uint, map[string]uint, error) {
addresses, err := LoadAddresses(ctx, provider, contract, batchSize, skipa, skipBlocks)
func LoadState(ctx context.Context, provider *ethrpc.Provider, contract common.Address, batchSize uint, skipa uint, skipb uint, skipBlocks uint) (uint, map[string]uint, uint, map[string]uint, error) {
ah, addresses, err := LoadAddresses(ctx, provider, contract, batchSize, skipa, skipBlocks)
if err != nil {
return nil, nil, err
return 0, nil, 0, nil, err
}

bytes32, err := LoadBytes32(ctx, provider, contract, batchSize, skipb, skipBlocks)
bh, bytes32, err := LoadBytes32(ctx, provider, contract, batchSize, skipb, skipBlocks)
if err != nil {
return nil, nil, err
return 0, nil, 0, nil, err
}

return addresses, bytes32, nil
return ah, addresses, bh, bytes32, nil
}

func LoadAddresses(ctx context.Context, provider *ethrpc.Provider, contract common.Address, batchSize uint, skip uint, skipBlocks uint) (map[string]uint, error) {
func LoadAddresses(ctx context.Context, provider *ethrpc.Provider, contract common.Address, batchSize uint, skip uint, skipBlocks uint) (uint, map[string]uint, error) {
// Load total number of addresses
asize, _, err := GetTotals(ctx, provider, contract, skipBlocks)
if err != nil {
return nil, err
return 0, nil, err
}

return LoadStorage(ctx, provider, contract, batchSize, skip, asize, AddressIndex)
}

func LoadBytes32(ctx context.Context, provider *ethrpc.Provider, contract common.Address, batchSize uint, skip uint, skipBlocks uint) (map[string]uint, error) {
func LoadBytes32(ctx context.Context, provider *ethrpc.Provider, contract common.Address, batchSize uint, skip uint, skipBlocks uint) (uint, map[string]uint, error) {
// Always skip index 0 for bytes32, it maps to the size slot
// it technically can be used, but it is not write-once, so
// it will lead to decompression errors
if skip == 0 {
skip = 1
}

// Load total number of bytes32
_, bsize, err := GetTotals(ctx, provider, contract, skipBlocks)
if err != nil {
return nil, err
return 0, nil, err
}

return LoadStorage(ctx, provider, contract, batchSize, skip, bsize, Bytes32Index)
}

func LoadStorage(ctx context.Context, provider *ethrpc.Provider, contract common.Address, batchSize uint, skip uint, total uint, itemplate func(uint) []byte) (map[string]uint, error) {
func LoadStorage(ctx context.Context, provider *ethrpc.Provider, contract common.Address, batchSize uint, skip uint, total uint, itemplate func(uint) []byte) (uint, map[string]uint, error) {
out := make(map[string]uint)

for i := skip; i < total; i += batchSize {
Expand All @@ -107,14 +114,14 @@ func LoadStorage(ctx context.Context, provider *ethrpc.Provider, contract common
}, nil)

if err != nil {
return nil, err
return 0, nil, err
}

err = ParseBatchResult(out, res, i)
if err != nil {
return nil, err
return 0, nil, err
}
}

return out, nil
return total, out, nil
}
24 changes: 18 additions & 6 deletions compressor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/0xsequence/ethkit/ethrpc"
"github.com/0xsequence/ethkit/go-ethereum/common"
"github.com/0xsequence/go-sequence"
"github.com/davecgh/go-spew/spew"
)

type CompressorInstance struct {
Expand All @@ -26,9 +25,14 @@ type CompressorInstance struct {
TrailBlocks uint
BatchSize uint

AddressesHeight uint
Bytes32Height uint

LastIndexUpdate uint
Provider *ethrpc.Provider

onLoadedIndexes func(uint, uint)

mu *sync.RWMutex
}

Expand Down Expand Up @@ -67,15 +71,19 @@ func NewCompressorManager(
return &c
}

func (cm *CompressorManager) SetOnLoadedIndexes(f func(uint, uint)) {
cm.instance.onLoadedIndexes = f
}

func LoadIndexes(ci *CompressorInstance) error {
ci.mu.RLock()
lenAddrs := uint(len(ci.Compressor.AddressIndexes))
lenBytes32s := uint(len(ci.Compressor.Bytes32Indexes))
lenAddrs := ci.AddressesHeight
lenBytes32s := ci.Bytes32Height
ci.mu.RUnlock()

// Don't lock while we read the state, it can take a while

addrs, bytes32s, err := LoadState(
nah, addrs, nbh, bytes32s, err := LoadState(
ci.Context,
ci.Provider,
ci.Contract,
Expand All @@ -101,8 +109,12 @@ func LoadIndexes(ci *CompressorInstance) error {
ci.Compressor.Bytes32Indexes[k] = v
}

spew.Dump(ci.Compressor.AddressIndexes)
spew.Dump(ci.Compressor.Bytes32Indexes)
ci.AddressesHeight = nah
ci.Bytes32Height = nbh

if ci.onLoadedIndexes != nil {
ci.onLoadedIndexes(nah-lenAddrs, nbh-lenBytes32s)
}

return nil
}
Expand Down

0 comments on commit 95d65fe

Please sign in to comment.