Skip to content

Commit

Permalink
feat: FilterClaimSubmission in chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
mpolitzer committed Dec 17, 2024
1 parent 7ebea78 commit 851ae04
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 19 deletions.
1 change: 1 addition & 0 deletions cmd/cartesi-rollups-claimer/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
},
EnableSubmission: true,
MaxStartupTime: 10 * time.Second,
ChunkSize: 50000, // hallf of alchemy limit by default
}
)

Expand Down
5 changes: 4 additions & 1 deletion internal/claimer/claimer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type CreateInfo struct {
Repository *repository.Database
EnableSubmission bool
MaxStartupTime time.Duration
ChunkSize uint64
}

type Service struct {
Expand All @@ -84,6 +85,7 @@ type Service struct {
EthConn *ethclient.Client
TxOpts *bind.TransactOpts
claimsInFlight map[address]hash // -> txHash
chunkSize uint64
}

func (c *CreateInfo) LoadEnv() {
Expand All @@ -94,8 +96,9 @@ func (c *CreateInfo) LoadEnv() {
c.BlockchainHttpEndpoint.Value = config.GetBlockchainHttpEndpoint()
c.PostgresEndpoint.Value = config.GetPostgresEndpoint()
c.PollInterval = config.GetClaimerPollingInterval()
c.LogLevel = service.LogLevel(config.GetLogLevel())
c.MaxStartupTime = config.GetMaxStartupTime()
c.LogLevel = service.LogLevel(config.GetLogLevel())
c.LogPretty = config.GetLogPrettyEnabled()
}

func Create(c *CreateInfo, s *Service) error {
Expand Down
87 changes: 69 additions & 18 deletions internal/claimer/side-effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
package claimer

import (
"context"
"fmt"
"iter"
"math"
"math/big"
"os"

"github.com/cartesi/rollups-node/pkg/contracts/iconsensus"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)

type sideEffects interface {
Expand Down Expand Up @@ -97,7 +102,7 @@ func (s *Service) findClaimSubmissionEventAndSucc(
*claimSubmissionEvent,
error,
) {
ic, curr, next, err := s.FindClaimSubmissionEventAndSucc(claim)
ic, curr, next, err := s.FindClaimSubmissionEventAndSucc(claim, s.chunkSize)
if err != nil {
s.Logger.Error("findClaimSubmissionEventAndSucc:failed",
"service", s.Name,
Expand Down Expand Up @@ -161,10 +166,58 @@ func (s *Service) pollTransaction(txHash hash) (bool, *types.Receipt, error) {
return ready, receipt, err
}

func chunkedFindSubmissionEvent(
ctx context.Context,
ic *iconsensus.IConsensus,
client *ethclient.Client,
chunk uint64,
start uint64,
end uint64,
) (iter.Seq2[*claimSubmissionEvent, error], error) {
var err error

if chunk == 0 || end < start {
return nil, os.ErrInvalid
}
if end == math.MaxUint64 {
end, err = client.BlockNumber(ctx)
if err != nil {
return nil, err
}
}

return func(yield func(*claimSubmissionEvent, error) bool) {
for start < end {
// open range, othewise we get duplicates
limit := min(start+chunk-1, end)
it, err := ic.FilterClaimSubmission(&bind.FilterOpts{
Context: ctx,
Start: start,
End: &limit,
}, nil, nil)
if err != nil {
yield(nil, err)
return
}

for it.Next() {
yield(it.Event, nil)
}
if it.Error() != nil {
yield(nil, err)
return
}

start = limit + 1
}
}, nil
}

// scan the event stream for a claimSubmission event that matches claim.
// return this event and its successor
func (s *Service) FindClaimSubmissionEventAndSucc(
claim *claimRow,
chunkSize uint64,
) (
*iconsensus.IConsensus,
*claimSubmissionEvent,
Expand All @@ -176,34 +229,32 @@ func (s *Service) FindClaimSubmissionEventAndSucc(
return nil, nil, nil, err
}

it, err := ic.FilterClaimSubmission(&bind.FilterOpts{
Context: s.Context,
Start: claim.EpochLastBlock,
}, nil, nil)
it, err := chunkedFindSubmissionEvent(s.Context, ic, s.EthConn,
claim.EpochLastBlock, math.MaxUint64, chunkSize)
if err != nil {
return nil, nil, nil, err
}
next, stop := iter.Pull2(it)
defer stop()

for it.Next() {
event := it.Event
for {
event, err, ok := next()
if err != nil || !ok {
return ic, nil, nil, err
}
lastBlock := event.LastProcessedBlockNumber.Uint64()

if claimMatchesEvent(claim, event) {
var succ *claimSubmissionEvent = nil
if it.Next() {
succ = it.Event
succ, err, ok := next()
if err != nil || !ok {
return ic, event, nil, err
}
if it.Error() != nil {
return nil, nil, nil, it.Error()
}
return ic, event, succ, nil
return ic, event, succ, err
} else if lastBlock > claim.EpochLastBlock {
err = fmt.Errorf("claim not found, searched up to %v", event)
return nil, nil, nil, err
}
}
if it.Error() != nil {
return nil, nil, nil, it.Error()
}
return ic, nil, nil, nil
}

/* poll a transaction hash for its submission status and receipt */
Expand Down

0 comments on commit 851ae04

Please sign in to comment.