diff --git a/cmd/cartesi-rollups-evm-reader/root/root.go b/cmd/cartesi-rollups-evm-reader/root/root.go index 5a57d76eb..35feb1fa3 100644 --- a/cmd/cartesi-rollups-evm-reader/root/root.go +++ b/cmd/cartesi-rollups-evm-reader/root/root.go @@ -12,6 +12,7 @@ import ( "time" "github.com/cartesi/rollups-node/internal/config" + cfg "github.com/cartesi/rollups-node/internal/evmreader/config" "github.com/cartesi/rollups-node/internal/evmreader/service" "github.com/cartesi/rollups-node/internal/repository" "github.com/cartesi/rollups-node/internal/services/startup" @@ -97,7 +98,7 @@ func run(cmd *cobra.Command, args []string) { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() - c := config.FromEnv() + c := cfg.GetEVMReaderConfig() // Override configs if verbose { @@ -115,7 +116,7 @@ func run(cmd *cobra.Command, args []string) { if defaultBlock != "" { evmReaderDefaultBlock, err := config.ToDefaultBlockFromString(defaultBlock) cobra.CheckErr(err) - c.EvmReaderDefaultBlock = evmReaderDefaultBlock + c.DefaultBlock = evmReaderDefaultBlock } // setup log @@ -130,7 +131,7 @@ func run(cmd *cobra.Command, args []string) { } defer database.Close() - _, err = startup.SetupNodePersistentConfig(ctx, database, c) + _, err = startup.SetupNodePersistentConfig(ctx, database, c.ToNodeConfig()) if err != nil { slog.Error("EVM Reader couldn't connect to the database", "error", err) os.Exit(1) @@ -141,8 +142,8 @@ func run(cmd *cobra.Command, args []string) { c.BlockchainHttpEndpoint.Value, c.BlockchainWsEndpoint.Value, database, - c.EvmReaderRetryPolicyMaxRetries, - c.EvmReaderRetryPolicyMaxDelay, + c.RetryPolicyMaxRetries, + c.RetryPolicyMaxDelay, ) // logs startup time diff --git a/internal/claimer/claimer.go b/internal/claimer/claimer.go index d57a52f10..5a8232c93 100644 --- a/internal/claimer/claimer.go +++ b/internal/claimer/claimer.go @@ -1,49 +1,89 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) +// Algorithm for the state transition of computed claims. Possible actions are: +// - update epoch in the database +// - submit claim to blockchain +// - transition application to an invalid state +// +// 1. On startup of a clean blockchain there are no previous claims nor events. +// +// - This configuration must submit a new computed claim. +// +// 2. Some time after the submission, the computed claim shows up as a claimSubmission +// event in the blockchain. The claim and event must match. +// +// - This configuration must update the epoch in the database: computed -> submitted +// +// 3. After the first epoch, additional checks must be done. Same as (1) otherwise. +// 3.1. No epoch was skipped: +// - previous_claim.last_block < current_claim.first_block +// +// 4. After the first epoch, additional checks must be done. Same as (2) otherwise. +// 4.1. epochs are in order: +// - previous_claim.last_block < current_claim.first_block +// +// 4.2. There are no events between the epochs +// - next(previous_event) == current_event +// +// Other cases are errors. +// +// | n | prev | curr | action | +// | | claim | event | claim | event | | +// |---+-------+-------+-------+-------+--------+ +// | 1 | . | . | cc | . | submit | +// | 2 | . | . | cc | ce | update | +// | 3 | pc | pe | cc | . | submit | +// | 4 | pc | pe | cc | ce | update | package claimer import ( "context" "fmt" - . "github.com/cartesi/rollups-node/internal/config" - . "github.com/cartesi/rollups-node/internal/repository" + "github.com/cartesi/rollups-node/internal/config" + "github.com/cartesi/rollups-node/internal/repository" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/service" "github.com/ethereum/go-ethereum/accounts/abi/bind" - . "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" ) +var ( + ErrClaimMismatch = fmt.Errorf("claim and antecessor mismatch") + ErrEventMismatch = fmt.Errorf("Computed Claim mismatches ClaimSubmission event") + ErrMissingEvent = fmt.Errorf("accepted claim has no matching blockchain event") +) + +type address = common.Address +type hash = common.Hash +type claimRow = repository.ClaimRow +type claimSubmissionEvent = iconsensus.IConsensusClaimSubmission + type CreateInfo struct { service.CreateInfo - Auth Auth + Auth config.Auth - BlockchainHttpEndpoint Redacted[string] + BlockchainHttpEndpoint config.Redacted[string] EthConn *ethclient.Client - PostgresEndpoint Redacted[string] - DBConn *Database + PostgresEndpoint config.Redacted[string] + DBConn *repository.Database EnableSubmission bool } -type claimKey struct { - hash Hash - epochLastBlock uint64 -} - type Service struct { service.Service submissionEnabled bool - DBConn *Database + DBConn *repository.Database EthConn *ethclient.Client TxOpts *bind.TransactOpts - ClaimsInFlight map[claimKey]Hash // -> txHash + claimsInFlight map[address]hash // -> txHash } func Create(ci CreateInfo, s *Service) error { @@ -67,7 +107,7 @@ func Create(ci CreateInfo, s *Service) error { if s.DBConn == nil { if ci.DBConn == nil { - ci.DBConn, err = Connect(s.Context, ci.PostgresEndpoint.Value) + ci.DBConn, err = repository.Connect(s.Context, ci.PostgresEndpoint.Value) if err != nil { return err } @@ -75,8 +115,8 @@ func Create(ci CreateInfo, s *Service) error { s.DBConn = ci.DBConn } - if s.ClaimsInFlight == nil { - s.ClaimsInFlight = map[claimKey]Hash{} + if s.claimsInFlight == nil { + s.claimsInFlight = map[address]hash{} } if s.submissionEnabled && s.TxOpts == nil { @@ -106,151 +146,189 @@ func (s *Service) Stop(bool) []error { } func (s *Service) Tick() []error { - err := s.submitClaimsAndUpdateDatabase(s) - if err != nil { - return []error{err} - } - return nil + return s.submitClaimsAndUpdateDatabase(s) } -func computedClaimToKey(c *ComputedClaim) claimKey { - return claimKey{ - hash: c.Hash, - epochLastBlock: c.EpochLastBlock, - } -} - -func eventToKey(e *iconsensus.IConsensusClaimSubmission) claimKey { - return claimKey{ - hash: e.Claim, - epochLastBlock: e.LastProcessedBlockNumber.Uint64(), - } -} - -func (s *Service) submitClaimsAndUpdateDatabase(se SideEffects) error { - claims, err := se.selectComputedClaims() +func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { + errs := []error{} + prevClaims, currClaims, err := se.selectClaimPairsPerApp() if err != nil { - return err - } - - computedClaimsMap := make(map[claimKey]*ComputedClaim) - for i := 0; i < len(claims); i++ { - computedClaimsMap[computedClaimToKey(&claims[i])] = &claims[i] + errs = append(errs, err) + return errs } // check claims in flight - for key, txHash := range s.ClaimsInFlight { + for key, txHash := range s.claimsInFlight { ready, receipt, err := se.pollTransaction(txHash) if err != nil { - return err + errs = append(errs, err) + return errs } if !ready { continue } - - if claim, ok := computedClaimsMap[key]; ok { - err = se.updateEpochWithSubmittedClaim( - s.DBConn, - s.Context, - claim, - receipt.TxHash) + if claim, ok := currClaims[key]; ok { + err = se.updateEpochWithSubmittedClaim(&claim, receipt.TxHash) if err != nil { - return err + errs = append(errs, err) + return errs } - delete(s.ClaimsInFlight, key) - delete(computedClaimsMap, key) s.Logger.Info("claimer: Claim submitted", "app", claim.AppContractAddress, - "claim", claim.Hash, + "claim", claim.EpochHash, "last_block", claim.EpochLastBlock, + "tx", txHash) + delete(currClaims, key) + } else { + s.Logger.Warn("claimer: expected claim in flight to be in currClaims.", "tx", receipt.TxHash) } + delete(s.claimsInFlight, key) } - // check event logs for the remaining claims, submit if not found - for i := 0; i < len(claims); i++ { - claimDB := &claims[i] - key := computedClaimToKey(claimDB) - _, isSelected := computedClaimsMap[key] - _, isInFlight := s.ClaimsInFlight[key] - if !isSelected || isInFlight { + // check computed claims + nextApp: for key, currClaimRow := range currClaims { + var ic *iconsensus.IConsensus = nil + var prevEvent *claimSubmissionEvent = nil + var currEvent *claimSubmissionEvent = nil + + if _, isInFlight := s.claimsInFlight[key]; isInFlight { continue } - s.Logger.Info("claimer: Checking if there was previous submitted claims", - "app", claimDB.AppContractAddress, - "claim", claimDB.Hash, - "last_block", claimDB.EpochLastBlock, - ) - it, inst, err := se.enumerateSubmitClaimEventsSince( - s.EthConn, s.Context, - claimDB.AppIConsensusAddress, - claimDB.EpochLastBlock+1) - if err != nil { - return err - } + if prevClaimRow, ok := prevClaims[key]; ok { + err := checkClaimsConstraint(&prevClaimRow, &currClaimRow) + if err != nil { + s.Logger.Error("claimer: database mismatch", + "prevClaim", prevClaimRow, + "currClaim", currClaimRow, + "err", err, + ) + delete(currClaims, key) + errs = append(errs, err) + goto nextApp + } - for it.Next() { - event := it.Event() - eventKey := eventToKey(event) - claim, ok := computedClaimsMap[eventKey] - - if event.LastProcessedBlockNumber.Uint64() > claimDB.EpochLastBlock { - // found a newer event than the claim we are processing. - s.Logger.Error("claimer: Found a newer event than claim", - "app", claimDB.AppContractAddress, - "claim", Hash(event.Claim), - "claim_last_block", claimDB.EpochLastBlock, - "event_last_block", event.LastProcessedBlockNumber.Uint64(), + // if prevClaimRow exists, there must be a matching event + ic, prevEvent, currEvent, err = + se.findClaimSubmissionEventAndSucc(&prevClaimRow) + if err != nil { + delete(currClaims, key) + errs = append(errs, err) + goto nextApp + } + if prevEvent == nil { + s.Logger.Error("claimer: missing event", + "claim", prevClaimRow, + "err", ErrMissingEvent, ) - return fmt.Errorf("Application in invalid state") - // TODO: put the application in an invalid state + delete(currClaims, key) + errs = append(errs, ErrMissingEvent) + goto nextApp } - s.Logger.Debug("claimer: Found previous submitted claim event", - "app", claimDB.AppContractAddress, - "claim", Hash(event.Claim), - "last_block", event.LastProcessedBlockNumber, - ) - if ok { - s.Logger.Info("claimer: Claim was previously submitted, updating the database", - "app", claimDB.AppContractAddress, - "claim", claimDB.Hash, - "last_block", claimDB.EpochLastBlock, + if !claimMatchesEvent(&prevClaimRow, prevEvent) { + s.Logger.Error("claimer: event mismatch", + "claim", prevClaimRow, + "event", prevEvent, + "err", ErrEventMismatch, ) - err := se.updateEpochWithSubmittedClaim( - s.DBConn, - s.Context, - claim, - event.Raw.TxHash) - if err != nil { - return err - } - delete(computedClaimsMap, eventKey) - break + delete(currClaims, key) + errs = append(errs, ErrEventMismatch) + goto nextApp + } + } else { + // first claim + ic, currEvent, _, err = + se.findClaimSubmissionEventAndSucc(&currClaimRow) + if err != nil { + delete(currClaims, key) + errs = append(errs, err) + goto nextApp } - } - if err := it.Error(); err != nil { - return err } - // submit if not found in the logs (fetch from hash again, can be stale) - if claim, ok := computedClaimsMap[key]; ok && s.submissionEnabled { - s.Logger.Info("claimer: Submitting claim to blockchain", - "app", claim.AppContractAddress, - "claim", claim.Hash, - "last_block", claim.EpochLastBlock, - ) - txHash, err := se.submitClaimToBlockchain(inst, s.TxOpts, claim) + if currEvent != nil { + if !claimMatchesEvent(&currClaimRow, currEvent) { + s.Logger.Error("claimer: event mismatch", + "claim", currClaimRow, + "event", currEvent, + "err", ErrEventMismatch, + ) + delete(currClaims, key) + errs = append(errs, ErrEventMismatch) + goto nextApp + } + txHash := currEvent.Raw.TxHash + err = se.updateEpochWithSubmittedClaim(&currClaimRow, txHash) if err != nil { - return err + delete(currClaims, key) + errs = append(errs, err) + goto nextApp + } + delete(s.claimsInFlight, key) + } else if s.submissionEnabled { + txHash, err := se.submitClaimToBlockchain(ic, &currClaimRow) + if err != nil { + delete(currClaims, key) + errs = append(errs, err) + goto nextApp } - s.ClaimsInFlight[key] = txHash - delete(computedClaimsMap, key) + s.Logger.Info("claimer: Submitting claim to blockchain", + "app", currClaimRow.AppContractAddress, + "claim", currClaimRow.EpochHash, + "last_block", currClaimRow.EpochLastBlock, + ) + s.claimsInFlight[currClaimRow.AppContractAddress] = txHash } } + return errs +} + +func checkClaimConstraint(c *claimRow) error { + zeroAddress := address{} + + if c.EpochFirstBlock > c.EpochLastBlock { + return ErrClaimMismatch + } + if c.AppIConsensusAddress == zeroAddress { + return ErrClaimMismatch + } + return nil +} + +func checkClaimsConstraint(p *claimRow, c *claimRow) error { + var err error + + err = checkClaimConstraint(c) + if err != nil { + return err + } + err = checkClaimConstraint(p) + if err != nil { + return err + } + + // p, c consistent + if p.AppContractAddress != c.AppContractAddress { + return ErrClaimMismatch + } + if p.EpochLastBlock > c.EpochLastBlock { + return ErrClaimMismatch + } + if p.EpochFirstBlock > c.EpochFirstBlock { + return ErrClaimMismatch + } + if p.EpochIndex > c.EpochIndex { + return ErrClaimMismatch + } return nil } +func claimMatchesEvent(c *claimRow, e *claimSubmissionEvent) bool { + return c.AppContractAddress == e.AppContract && + c.EpochLastBlock == e.LastProcessedBlockNumber.Uint64() +} + func (s *Service) Start(context context.Context, ready chan<- struct{}) error { ready <- struct{}{} return s.Serve() diff --git a/internal/claimer/claimer_test.go b/internal/claimer/claimer_test.go index ea4559e79..0591b288f 100644 --- a/internal/claimer/claimer_test.go +++ b/internal/claimer/claimer_test.go @@ -4,309 +4,537 @@ package claimer import ( - "context" "log/slog" "math/big" + "os" "testing" - . "github.com/cartesi/rollups-node/internal/repository" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/service" + "github.com/lmittmann/tint" - "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" . "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) -type ServiceMock struct { +type serviceMock struct { mock.Mock Service } -func (m *ServiceMock) submitClaimToBlockchain( - instance *iconsensus.IConsensus, - signer *bind.TransactOpts, - claim *ComputedClaim, -) (Hash, error) { - args := m.Called(nil, nil, claim) - return args.Get(0).(Hash), args.Error(1) -} - -func (m *ServiceMock) selectComputedClaims() ([]ComputedClaim, error) { +func (m *serviceMock) selectClaimPairsPerApp() ( + map[address]claimRow, + map[address]claimRow, + error, +) { args := m.Called() - return args.Get(0).([]ComputedClaim), args.Error(1) + return args.Get(0).(map[address]claimRow), + args.Get(1).(map[address]claimRow), + args.Error(2) } - -func (m *ServiceMock) updateEpochWithSubmittedClaim( - dbConn *Database, - context context.Context, - claim *ComputedClaim, +func (m *serviceMock) updateEpochWithSubmittedClaim( + claim *claimRow, txHash Hash, ) error { - args := m.Called(nil, nil, claim, txHash) + args := m.Called(claim, txHash) return args.Error(0) } -func (m *ServiceMock) enumerateSubmitClaimEventsSince( - ethConn *ethclient.Client, - context context.Context, - appIConsensusAddr Address, - epochLastBlock uint64, +func (m *serviceMock) findClaimSubmissionEventAndSucc( + claim *claimRow, ) ( - IClaimSubmissionIterator, *iconsensus.IConsensus, + *claimSubmissionEvent, + *claimSubmissionEvent, error, ) { - args := m.Called() - return args.Get(0).(IClaimSubmissionIterator), - args.Get(1).(*iconsensus.IConsensus), - args.Error(2) + args := m.Called(claim) + return args.Get(0).(*iconsensus.IConsensus), + args.Get(1).(*claimSubmissionEvent), + args.Get(2).(*claimSubmissionEvent), + args.Error(3) } - -func (m *ServiceMock) pollTransaction(txHash Hash) (bool, *types.Receipt, error) { +func (m *serviceMock) submitClaimToBlockchain( + instance *iconsensus.IConsensus, + claim *claimRow, +) (Hash, error) { + args := m.Called(nil, claim) + return args.Get(0).(Hash), args.Error(1) +} +func (m *serviceMock) pollTransaction(txHash Hash) (bool, *types.Receipt, error) { args := m.Called(txHash) return args.Bool(0), args.Get(1).(*types.Receipt), args.Error(2) } -func newServiceMock() *ServiceMock { - return &ServiceMock{ +func newServiceMock() *serviceMock { + opts := &tint.Options{ + Level: slog.LevelDebug, + AddSource: true, + // RFC3339 with milliseconds and without timezone + TimeFormat: "2006-01-02T15:04:05.000", + } + handler := tint.NewHandler(os.Stdout, opts) + + return &serviceMock{ Service: Service{ Service: service.Service{ - Logger: slog.Default(), + Logger: slog.New(handler), }, submissionEnabled: true, + claimsInFlight: map[address]hash{}, }, } } -type ClaimSubmissionIteratorMock struct { - mock.Mock -} +// ////////////////////////////////////////////////////////////////////////////// +// Success +// ////////////////////////////////////////////////////////////////////////////// +func TestDoNothing(t *testing.T) { + m := newServiceMock() + prevClaims := map[address]claimRow{} + currClaims := map[address]claimRow{} -func (m *ClaimSubmissionIteratorMock) Next() bool { - args := m.Called() - return args.Bool(0) -} + m.On("selectClaimPairsPerApp"). + Return(prevClaims, currClaims, nil) -func (m *ClaimSubmissionIteratorMock) Error() error { - args := m.Called() - return args.Error(0) + errs := m.submitClaimsAndUpdateDatabase(m) + assert.Equal(t, len(errs), 0) } -func (m *ClaimSubmissionIteratorMock) Event() *iconsensus.IConsensusClaimSubmission { - args := m.Called() - return args.Get(0).(*iconsensus.IConsensusClaimSubmission) -} - -// ////////////////////////////////////////////////////////////////////////////// -// Test -// ////////////////////////////////////////////////////////////////////////////// - -// Do notghing when there are no claims to process -func TestEmptySelectComputedClaimsDoesNothing(t *testing.T) { +func TestSubmitFirstClaim(t *testing.T) { m := newServiceMock() - m.ClaimsInFlight = make(map[claimKey]Hash) - - m.On("selectComputedClaims").Return([]ComputedClaim{}, nil) + appContractAddress := common.HexToAddress("0x01") + claimTransactionHash := common.HexToHash("0x10") + currClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 3, + EpochFirstBlock: 30, + EpochLastBlock: 39, + } - err := m.submitClaimsAndUpdateDatabase(m) - assert.Nil(t, err) + var prevEvent *claimSubmissionEvent = nil + var currEvent *claimSubmissionEvent = nil + prevClaims := map[address]claimRow{} + currClaims := map[address]claimRow{ + appContractAddress: currClaim, + } - m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 0) - m.AssertNumberOfCalls(t, "selectComputedClaims", 1) - m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 0) - m.AssertNumberOfCalls(t, "enumerateSubmitClaimEventsSince", 0) + m.On("selectClaimPairsPerApp"). + Return(prevClaims, currClaims, nil) + m.On("findClaimSubmissionEventAndSucc", &currClaim). + Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil) + m.On("submitClaimToBlockchain", nil, &currClaim). + Return(claimTransactionHash, nil) + + errs := m.submitClaimsAndUpdateDatabase(m) + assert.Equal(t, len(errs), 0) + assert.Equal(t, len(m.claimsInFlight), 1) + m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 1) m.AssertNumberOfCalls(t, "pollTransaction", 0) + m.AssertNumberOfCalls(t, "selectClaimPairsPerApp", 1) + m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 1) + m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 0) } -// Got a claim. -// Submit if new (by checking the event logs) -func TestSubmitNewClaim(t *testing.T) { +func TestSubmitClaimWithAntecessor(t *testing.T) { m := newServiceMock() + appContractAddress := common.HexToAddress("0x01") + claimTransactionHash := common.HexToHash("0x10") + prevClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 1, + EpochFirstBlock: 10, + EpochLastBlock: 19, + } + currClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 3, + EpochFirstBlock: 30, + EpochLastBlock: 39, + } - newClaimHash := HexToHash("0x01") - newClaimTxHash := HexToHash("0x10") - newClaim := ComputedClaim{ - Hash: newClaimHash, - } - m.ClaimsInFlight = map[claimKey]Hash{} - m.On("selectComputedClaims").Return([]ComputedClaim{ - newClaim, - }, nil) - m.On("submitClaimToBlockchain", nil, nil, &newClaim). - Return(newClaimTxHash, nil) - - itMock := &ClaimSubmissionIteratorMock{} - itMock.On("Next").Return(false) - itMock.On("Error").Return(nil) - - m.On("enumerateSubmitClaimEventsSince"). - Return(itMock, &iconsensus.IConsensus{}, nil) - m.On("pollTransaction", newClaimTxHash). - Return(false, &types.Receipt{}, nil) - assert.Equal(t, len(m.ClaimsInFlight), 0) - - err := m.submitClaimsAndUpdateDatabase(m) - assert.Nil(t, err) - - assert.Equal(t, len(m.ClaimsInFlight), 1) - m.AssertNumberOfCalls(t, "enumerateSubmitClaimEventsSince", 1) + prevClaims := map[address]claimRow{ + appContractAddress: prevClaim, + } + var currEvent *claimSubmissionEvent = nil + prevEvent := &claimSubmissionEvent{ + LastProcessedBlockNumber: new(big.Int).SetUint64(prevClaim.EpochLastBlock), + AppContract: appContractAddress, + } + currClaims := map[address]claimRow{ + appContractAddress: currClaim, + } + + m.On("selectClaimPairsPerApp"). + Return(prevClaims, currClaims, nil) + m.On("findClaimSubmissionEventAndSucc", &prevClaim). + Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil) + m.On("submitClaimToBlockchain", nil, &currClaim). + Return(claimTransactionHash, nil) + + errs := m.submitClaimsAndUpdateDatabase(m) + assert.Equal(t, len(errs), 0) + assert.Equal(t, len(m.claimsInFlight), 1) + m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 1) m.AssertNumberOfCalls(t, "pollTransaction", 0) - m.AssertNumberOfCalls(t, "selectComputedClaims", 1) + m.AssertNumberOfCalls(t, "selectClaimPairsPerApp", 1) m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 1) m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 0) } -// Got a claim, don't submit. -func TestSubmitNewClaimDisabled(t *testing.T) { +func TestSkipSubmitFirstClaim(t *testing.T) { m := newServiceMock() m.submissionEnabled = false + appContractAddress := common.HexToAddress("0x01") + claimTransactionHash := common.HexToHash("0x10") + currClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 3, + EpochFirstBlock: 30, + EpochLastBlock: 39, + } + + var prevEvent *claimSubmissionEvent = nil + var currEvent *claimSubmissionEvent = nil + prevClaims := map[address]claimRow{} + currClaims := map[address]claimRow{ + appContractAddress: currClaim, + } - newClaimHash := HexToHash("0x01") - newClaimTxHash := HexToHash("0x10") - newClaim := ComputedClaim{ - Hash: newClaimHash, - } - m.ClaimsInFlight = map[claimKey]Hash{} - m.On("selectComputedClaims").Return([]ComputedClaim{ - newClaim, - }, nil) - m.On("submitClaimToBlockchain", nil, nil, &newClaim). - Return(newClaimTxHash, nil) - - itMock := &ClaimSubmissionIteratorMock{} - itMock.On("Next").Return(false) - itMock.On("Error").Return(nil) - - m.On("enumerateSubmitClaimEventsSince"). - Return(itMock, &iconsensus.IConsensus{}, nil) - m.On("pollTransaction", newClaimTxHash). - Return(false, &types.Receipt{}, nil) - assert.Equal(t, len(m.ClaimsInFlight), 0) - - err := m.submitClaimsAndUpdateDatabase(m) - assert.Nil(t, err) - - assert.Equal(t, len(m.ClaimsInFlight), 0) - m.AssertNumberOfCalls(t, "enumerateSubmitClaimEventsSince", 1) + m.On("selectClaimPairsPerApp"). + Return(prevClaims, currClaims, nil) + m.On("findClaimSubmissionEventAndSucc", &currClaim). + Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil) + m.On("submitClaimToBlockchain", nil, &currClaim). + Return(claimTransactionHash, nil) + + errs := m.submitClaimsAndUpdateDatabase(m) + assert.Equal(t, len(errs), 0) + assert.Equal(t, len(m.claimsInFlight), 0) + m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 1) m.AssertNumberOfCalls(t, "pollTransaction", 0) - m.AssertNumberOfCalls(t, "selectComputedClaims", 1) + m.AssertNumberOfCalls(t, "selectClaimPairsPerApp", 1) m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 0) m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 0) } - -// Query the blockchain for the submitClaim transaction, it may not be ready yet -func TestClaimInFlightNotReadyDoesNothing(t *testing.T) { +func TestSkipSubmitClaimWithAntecessor(t *testing.T) { m := newServiceMock() + m.submissionEnabled = false + appContractAddress := common.HexToAddress("0x01") + claimTransactionHash := common.HexToHash("0x10") + prevClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 1, + EpochFirstBlock: 10, + EpochLastBlock: 19, + } + currClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 3, + EpochFirstBlock: 30, + EpochLastBlock: 39, + } - claimInFlightHash := HexToHash("0x01") - claimInFlightTxHash := HexToHash("0x10") - claimInFlight := ComputedClaim{ - Hash: claimInFlightHash, - EpochLastBlock: 1, + prevClaims := map[address]claimRow{ + appContractAddress: prevClaim, + } + var currEvent *claimSubmissionEvent = nil + prevEvent := &claimSubmissionEvent{ + LastProcessedBlockNumber: new(big.Int).SetUint64(prevClaim.EpochLastBlock), + AppContract: appContractAddress, } - m.ClaimsInFlight = map[claimKey]Hash{ - computedClaimToKey(&claimInFlight): claimInFlightTxHash, + currClaims := map[address]claimRow{ + appContractAddress: currClaim, } - m.On("selectComputedClaims").Return([]ComputedClaim{ - claimInFlight, - }, nil) - m.On("pollTransaction", claimInFlightTxHash). - Return(false, &types.Receipt{}, nil) - assert.Equal(t, len(m.ClaimsInFlight), 1) - err := m.submitClaimsAndUpdateDatabase(m) - assert.Nil(t, err) + m.On("selectClaimPairsPerApp"). + Return(prevClaims, currClaims, nil) + m.On("findClaimSubmissionEventAndSucc", &prevClaim). + Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil) + m.On("submitClaimToBlockchain", nil, &currClaim). + Return(claimTransactionHash, nil) + + errs := m.submitClaimsAndUpdateDatabase(m) + assert.Equal(t, len(errs), 0) + assert.Equal(t, len(m.claimsInFlight), 0) + m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 1) + m.AssertNumberOfCalls(t, "pollTransaction", 0) + m.AssertNumberOfCalls(t, "selectClaimPairsPerApp", 1) + m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 0) + m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 0) +} - assert.Equal(t, len(m.ClaimsInFlight), 1) - m.AssertNumberOfCalls(t, "enumerateSubmitClaimEventsSince", 0) +func TestInFlightCompleted(t *testing.T) { + m := newServiceMock() + appContractAddress := common.HexToAddress("0x01") + reqHash := common.HexToHash("0x10") + txHash := common.HexToHash("0x100") + currClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 1, + EpochFirstBlock: 10, + EpochLastBlock: 19, + } + prevClaims := map[address]claimRow{} + currClaims := map[address]claimRow{ + appContractAddress: currClaim, + } + m.claimsInFlight[appContractAddress] = reqHash + + m.On("selectClaimPairsPerApp"). + Return(prevClaims, currClaims, nil) + m.On("pollTransaction", reqHash). + Return(true, &types.Receipt{ + ContractAddress: appContractAddress, + TxHash: txHash, + }, nil) + m.On("updateEpochWithSubmittedClaim", &currClaim, txHash). + Return(nil) + + errs := m.submitClaimsAndUpdateDatabase(m) + assert.Equal(t, len(errs), 0) + assert.Equal(t, len(m.claimsInFlight), 0) + m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 0) m.AssertNumberOfCalls(t, "pollTransaction", 1) - m.AssertNumberOfCalls(t, "selectComputedClaims", 1) + m.AssertNumberOfCalls(t, "selectClaimPairsPerApp", 1) m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 0) - m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 0) + m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 1) } -// Update ClaimsInFlight and the database when a submitClaim transaction is completed -func TestUpdateClaimInFlightViaPollTransaction(t *testing.T) { +func TestUpdateFirstClaim(t *testing.T) { m := newServiceMock() + appContractAddress := common.HexToAddress("0x01") + currClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 3, + EpochFirstBlock: 30, + EpochLastBlock: 39, + } - claimInFlightHash := HexToHash("0x01") - claimInFlightTxHash := HexToHash("0x10") - claimInFlight := ComputedClaim{ - Hash: claimInFlightHash, - EpochLastBlock: 1, - } - m.ClaimsInFlight = map[claimKey]Hash{ - computedClaimToKey(&claimInFlight): claimInFlightTxHash, - } - m.On("selectComputedClaims").Return([]ComputedClaim{ - claimInFlight, - }, nil) - m.On("pollTransaction", claimInFlightTxHash). - Return(true, &types.Receipt{TxHash: claimInFlightTxHash}, nil) - m.On("updateEpochWithSubmittedClaim", - nil, nil, &claimInFlight, claimInFlightTxHash).Return(nil) - assert.Equal(t, len(m.ClaimsInFlight), 1) - - err := m.submitClaimsAndUpdateDatabase(m) - assert.Nil(t, err) - - assert.Equal(t, len(m.ClaimsInFlight), 0) - m.AssertNumberOfCalls(t, "enumerateSubmitClaimEventsSince", 0) - m.AssertNumberOfCalls(t, "pollTransaction", 1) - m.AssertNumberOfCalls(t, "selectComputedClaims", 1) + var nilEvent *claimSubmissionEvent = nil + currEvent := claimSubmissionEvent{ + AppContract: appContractAddress, + LastProcessedBlockNumber: new(big.Int).SetUint64(currClaim.EpochLastBlock), + } + prevClaims := map[address]claimRow{} + currClaims := map[address]claimRow{ + appContractAddress: currClaim, + } + m.On("selectClaimPairsPerApp"). + Return(prevClaims, currClaims, nil) + m.On("findClaimSubmissionEventAndSucc", &currClaim). + Return(&iconsensus.IConsensus{}, &currEvent, nilEvent, nil) + m.On("updateEpochWithSubmittedClaim", &currClaim, currEvent.Raw.TxHash). + Return(nil) + + errs := m.submitClaimsAndUpdateDatabase(m) + assert.Equal(t, len(errs), 0) + assert.Equal(t, len(m.claimsInFlight), 0) + m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 1) + m.AssertNumberOfCalls(t, "pollTransaction", 0) + m.AssertNumberOfCalls(t, "selectClaimPairsPerApp", 1) m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 0) m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 1) } -// This shouldn't happen normally, -// but a submitClaim transaction may be on the blockchain but the database not know it. -// The blockchain is the source of truth in any case. -// So search for the transaction in the event logs. -// And if found, update the database. -func TestUpdateClaimViaEventLog(t *testing.T) { +func TestUpdateClaimWithAntecessor(t *testing.T) { m := newServiceMock() + appContractAddress := common.HexToAddress("0x01") + prevClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 1, + EpochFirstBlock: 10, + EpochLastBlock: 19, + } + currClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 3, + EpochFirstBlock: 30, + EpochLastBlock: 39, + } - existingClaimHash := HexToHash("0x01") - existingClaimTxHash := HexToHash("0x10") - existingClaim := ComputedClaim{ - Hash: existingClaimHash, - EpochLastBlock: 10, - } - m.ClaimsInFlight = map[claimKey]Hash{} - m.On("selectComputedClaims").Return([]ComputedClaim{ - existingClaim, - }, nil) - m.On("updateEpochWithSubmittedClaim", - nil, nil, &existingClaim, existingClaimTxHash).Return(nil) - - itMock := &ClaimSubmissionIteratorMock{} - itMock.On("Next").Return(true).Once() - itMock.On("Error").Return(nil) - itMock.On("Event").Return(&iconsensus.IConsensusClaimSubmission{ - Claim: existingClaimHash, - LastProcessedBlockNumber: big.NewInt(10), - Raw: types.Log{TxHash: existingClaimTxHash}, - }).Once() - - m.On("enumerateSubmitClaimEventsSince"). - Return(itMock, &iconsensus.IConsensus{}, nil) - assert.Equal(t, len(m.ClaimsInFlight), 0) - - err := m.submitClaimsAndUpdateDatabase(m) - assert.Nil(t, err) - - assert.Equal(t, len(m.ClaimsInFlight), 0) - m.AssertNumberOfCalls(t, "enumerateSubmitClaimEventsSince", 1) + prevEvent := claimSubmissionEvent{ + AppContract: appContractAddress, + LastProcessedBlockNumber: new(big.Int).SetUint64(prevClaim.EpochLastBlock), + } + currEvent := claimSubmissionEvent{ + AppContract: appContractAddress, + LastProcessedBlockNumber: new(big.Int).SetUint64(currClaim.EpochLastBlock), + } + prevClaims := map[address]claimRow{ + appContractAddress: prevClaim, + } + currClaims := map[address]claimRow{ + appContractAddress: currClaim, + } + m.On("selectClaimPairsPerApp"). + Return(prevClaims, currClaims, nil) + m.On("findClaimSubmissionEventAndSucc", &prevClaim). + Return(&iconsensus.IConsensus{}, &prevEvent, &currEvent, nil) + m.On("updateEpochWithSubmittedClaim", &currClaim, currEvent.Raw.TxHash). + Return(nil) + + errs := m.submitClaimsAndUpdateDatabase(m) + assert.Equal(t, len(errs), 0) + assert.Equal(t, len(m.claimsInFlight), 0) + m.AssertNumberOfCalls(t, "findClaimSubmissionEventAndSucc", 1) m.AssertNumberOfCalls(t, "pollTransaction", 0) - m.AssertNumberOfCalls(t, "selectComputedClaims", 1) + m.AssertNumberOfCalls(t, "selectClaimPairsPerApp", 1) m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 0) m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 1) } + +// ////////////////////////////////////////////////////////////////////////////// +// Failure +// ////////////////////////////////////////////////////////////////////////////// + +// !claimMatchesEvent(prevClaim, prevEvent) +func TestSubmitClaimWithAntecessorMismatch(t *testing.T) { + m := newServiceMock() + appContractAddress := common.HexToAddress("0x01") + claimTransactionHash := common.HexToHash("0x10") + prevClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 1, + EpochFirstBlock: 10, + EpochLastBlock: 19, + } + currClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 3, + EpochFirstBlock: 30, + EpochLastBlock: 39, + } + + prevClaims := map[address]claimRow{ + appContractAddress: prevClaim, + } + var currEvent *claimSubmissionEvent = nil + prevEvent := &claimSubmissionEvent{ + LastProcessedBlockNumber: new(big.Int).SetUint64(prevClaim.EpochLastBlock + 1), + AppContract: appContractAddress, + } + currClaims := map[address]claimRow{ + appContractAddress: currClaim, + } + + m.On("selectClaimPairsPerApp"). + Return(prevClaims, currClaims, nil) + m.On("findClaimSubmissionEventAndSucc", &prevClaim). + Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil) + m.On("submitClaimToBlockchain", nil, &currClaim). + Return(claimTransactionHash, nil) + + errs := m.submitClaimsAndUpdateDatabase(m) + assert.Equal(t, len(errs), 1) + assert.Equal(t, errs[0], ErrEventMismatch) +} + +// !claimMatchesEvent(currClaim, currEvent) +func TestSubmitClaimWithEventMismatch(t *testing.T) { + m := newServiceMock() + appContractAddress := common.HexToAddress("0x01") + prevClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 1, + EpochFirstBlock: 10, + EpochLastBlock: 19, + } + currClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 3, + EpochFirstBlock: 30, + EpochLastBlock: 39, + } + + prevEvent := claimSubmissionEvent{ + AppContract: appContractAddress, + LastProcessedBlockNumber: new(big.Int).SetUint64(prevClaim.EpochLastBlock), + } + currEvent := claimSubmissionEvent{ + AppContract: appContractAddress, + LastProcessedBlockNumber: new(big.Int).SetUint64(prevClaim.EpochLastBlock + 1), + } + prevClaims := map[address]claimRow{ + appContractAddress: prevClaim, + } + currClaims := map[address]claimRow{ + appContractAddress: currClaim, + } + m.On("selectClaimPairsPerApp"). + Return(prevClaims, currClaims, nil) + m.On("findClaimSubmissionEventAndSucc", &prevClaim). + Return(&iconsensus.IConsensus{}, &prevEvent, &currEvent, nil) + m.On("updateEpochWithSubmittedClaim", &currClaim, currEvent.Raw.TxHash). + Return(nil) + + errs := m.submitClaimsAndUpdateDatabase(m) + assert.Equal(t, len(errs), 1) + assert.Equal(t, errs[0], ErrEventMismatch) +} + +// !checkClaimsConstraint(prevClaim, currClaim) +func TestSubmitClaimWithAntecessorOutOfOrder(t *testing.T) { + m := newServiceMock() + appContractAddress := common.HexToAddress("0x01") + claimTransactionHash := common.HexToHash("0x10") + prevClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 2, + EpochFirstBlock: 20, + EpochLastBlock: 29, + } + currClaim := claimRow{ + AppContractAddress: appContractAddress, + AppIConsensusAddress: appContractAddress, + EpochIndex: 1, + EpochFirstBlock: 10, + EpochLastBlock: 19, + } + + prevClaims := map[address]claimRow{ + appContractAddress: prevClaim, + } + var currEvent *claimSubmissionEvent = nil + prevEvent := &claimSubmissionEvent{ + LastProcessedBlockNumber: new(big.Int).SetUint64(prevClaim.EpochLastBlock + 1), + AppContract: appContractAddress, + } + currClaims := map[address]claimRow{ + appContractAddress: currClaim, + } + + m.On("selectClaimPairsPerApp"). + Return(prevClaims, currClaims, nil) + m.On("findClaimSubmissionEventAndSucc", &prevClaim). + Return(&iconsensus.IConsensus{}, prevEvent, currEvent, nil) + m.On("submitClaimToBlockchain", nil, &currClaim). + Return(claimTransactionHash, nil) + + errs := m.submitClaimsAndUpdateDatabase(m) + assert.Equal(t, len(errs), 1) + assert.Equal(t, errs[0], ErrClaimMismatch) +} + diff --git a/internal/claimer/side-effects.go b/internal/claimer/side-effects.go index 1c375c736..2b243072b 100644 --- a/internal/claimer/side-effects.go +++ b/internal/claimer/side-effects.go @@ -4,193 +4,142 @@ package claimer import ( - "context" + "fmt" "math/big" - //. "github.com/cartesi/rollups-node/internal/config" - . "github.com/cartesi/rollups-node/internal/repository" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - . "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" ) -type IClaimSubmissionIterator interface { - Next() bool - Error() error - Event() *iconsensus.IConsensusClaimSubmission -} - -type ClaimSubmissionIterator struct { - iterator *iconsensus.IConsensusClaimSubmissionIterator -} - -func (p *ClaimSubmissionIterator) Next() bool { - return p.iterator.Next() -} - -func (p *ClaimSubmissionIterator) Error() error { - return p.iterator.Error() -} - -func (p *ClaimSubmissionIterator) Event() *iconsensus.IConsensusClaimSubmission { - return p.iterator.Event -} - -type SideEffects interface { - submitClaimToBlockchain( - instance *iconsensus.IConsensus, - signer *bind.TransactOpts, - claim *ComputedClaim, - ) (Hash, error) - - selectComputedClaims() ([]ComputedClaim, error) - +type sideEffects interface { + // database + selectClaimPairsPerApp() ( + map[address]claimRow, + map[address]claimRow, + error, + ) updateEpochWithSubmittedClaim( - DBConn *Database, - context context.Context, - claim *ComputedClaim, - txHash Hash, + claim *claimRow, + txHash hash, ) error - enumerateSubmitClaimEventsSince( - EthConn *ethclient.Client, - context context.Context, - appIConsensusAddr Address, - epochLastBlock uint64, + // blockchain + findClaimSubmissionEventAndSucc( + claim *claimRow, ) ( - IClaimSubmissionIterator, *iconsensus.IConsensus, - error) - - pollTransaction(txHash Hash) (bool, *types.Receipt, error) -} - -func (s *Service) submitClaimToBlockchain( - instance *iconsensus.IConsensus, - signer *bind.TransactOpts, - claim *ComputedClaim, -) (Hash, error) { - txHash := Hash{} - lastBlockNumber := new(big.Int).SetUint64(claim.EpochLastBlock) - tx, err := instance.SubmitClaim(signer, claim.AppContractAddress, - lastBlockNumber, claim.Hash) - if err != nil { - s.Logger.Error("submitClaimToBlockchain:failed", - "service", s.Name, - "appContractAddress", claim.AppContractAddress, - "claimHash", claim.Hash, - "error", err) - } else { - txHash = tx.Hash() - s.Logger.Debug("SubmitClaimToBlockchain:success", - "service", s.Name, - "appContractAddress", claim.AppContractAddress, - "claimHash", claim.Hash, - "TxHash", txHash) - } - return txHash, err + *claimSubmissionEvent, + *claimSubmissionEvent, + error, + ) + submitClaimToBlockchain( + ic *iconsensus.IConsensus, + claim *claimRow, + ) ( + hash, + error, + ) + pollTransaction(txHash hash) ( + bool, + *types.Receipt, + error, + ) } -func (s *Service) selectComputedClaims() ([]ComputedClaim, error) { - claims, err := s.DBConn.SelectComputedClaims(s.Context) +func (s *Service) selectClaimPairsPerApp() ( + map[address]claimRow, + map[address]claimRow, + error, +) { + computed, accepted, err := s.DBConn.SelectClaimPairsPerApp(s.Context) if err != nil { - s.Logger.Error("SelectComputedClaims:failed", + s.Logger.Error("selectClaimPairsPerApp:failed", "service", s.Name, "error", err) } else { - var ids []uint64 - for _, claim := range claims { - ids = append(ids, claim.EpochID) - } - s.Logger.Debug("SelectComputedClaims:success", + s.Logger.Debug("selectClaimPairsPerApp:success", "service", s.Name, - "claims", len(claims), - "ids", ids, - "inFlight", len(s.ClaimsInFlight)) + "len(computed)", len(computed), + "len(accepted)", len(accepted)) } - return claims, err + return accepted, computed, err } /* update the database epoch status to CLAIM_SUBMITTED and add a transaction hash */ func (s *Service) updateEpochWithSubmittedClaim( - DBConn *Database, - context context.Context, - claim *ComputedClaim, - txHash Hash, + claim *claimRow, + txHash hash, ) error { - err := DBConn.UpdateEpochWithSubmittedClaim(context, claim.EpochID, txHash) + err := s.DBConn.UpdateEpochWithSubmittedClaim(s.Context, claim.EpochID, txHash) if err != nil { - s.Logger.Error("UpdateEpochWithSubmittedClaim:failed", + s.Logger.Error("updateEpochWithSubmittedClaim:failed", "service", s.Name, "appContractAddress", claim.AppContractAddress, - "hash", claim.Hash, + "hash", claim.EpochHash, "txHash", txHash, "error", err) } else { - s.Logger.Debug("UpdateEpochWithSubmittedClaim:success", + s.Logger.Debug("updateEpochWithSubmittedClaim:success", "service", s.Name, "appContractAddress", claim.AppContractAddress, - "hash", claim.Hash, + "hash", claim.EpochHash, "txHash", txHash) } return err } -func (s *Service) enumerateSubmitClaimEventsSince( - EthConn *ethclient.Client, - context context.Context, - appIConsensusAddr Address, - epochLastBlock uint64, +func (s *Service) findClaimSubmissionEventAndSucc( + claim *claimRow, ) ( - IClaimSubmissionIterator, *iconsensus.IConsensus, + *claimSubmissionEvent, + *claimSubmissionEvent, error, ) { - it, ic, err := s.EnumerateSubmitClaimEventsSince( - EthConn, context, appIConsensusAddr, epochLastBlock) - + ic, curr, next, err := s.FindClaimSubmissionEventAndSucc(claim) if err != nil { - s.Logger.Error("EnumerateSubmitClaimEventsSince:failed", + s.Logger.Error("findClaimSubmissionEventAndSucc:failed", "service", s.Name, - "appIConsensusAddr", appIConsensusAddr, - "epochLastBlock", epochLastBlock, + "claim", claim, "error", err) } else { - s.Logger.Debug("EnumerateSubmitClaimEventsSince:success", + s.Logger.Debug("findClaimSubmissionEventAndSucc:success", "service", s.Name, - "appIConsensusAddr", appIConsensusAddr, - "epochLastBlock", epochLastBlock) + "claim", claim, + "currEvent", curr, + "nextEvent", next, + ) } - return it, ic, err + return ic, curr, next, err } -func (s *Service) EnumerateSubmitClaimEventsSince( - EthConn *ethclient.Client, - context context.Context, - appIConsensusAddr Address, - epochLastBlock uint64, -) ( - IClaimSubmissionIterator, - *iconsensus.IConsensus, - error, -) { - ic, err := iconsensus.NewIConsensus(appIConsensusAddr, EthConn) +func (s *Service) submitClaimToBlockchain( + ic *iconsensus.IConsensus, + claim *claimRow, +) (hash, error) { + txHash := hash{} + lastBlockNumber := new(big.Int).SetUint64(claim.EpochLastBlock) + tx, err := ic.SubmitClaim(s.TxOpts, claim.AppContractAddress, + lastBlockNumber, claim.EpochHash) if err != nil { - return nil, nil, err + s.Logger.Error("submitClaimToBlockchain:failed", + "service", s.Name, + "appContractAddress", claim.AppContractAddress, + "claimHash", claim.EpochHash, + "error", err) + } else { + txHash = tx.Hash() + s.Logger.Debug("submitClaimToBlockchain:success", + "service", s.Name, + "appContractAddress", claim.AppContractAddress, + "claimHash", claim.EpochHash, + "TxHash", txHash) } - - it, err := ic.FilterClaimSubmission(&bind.FilterOpts{ - Context: context, - Start: epochLastBlock, - }, nil, nil) - - return &ClaimSubmissionIterator{iterator: it}, ic, nil + return txHash, err } -func (s *Service) pollTransaction(txHash Hash) (bool, *types.Receipt, error) { +func (s *Service) pollTransaction(txHash hash) (bool, *types.Receipt, error) { ready, receipt, err := s.PollTransaction(txHash) if err != nil { s.Logger.Error("PollTransaction:failed", @@ -212,7 +161,53 @@ func (s *Service) pollTransaction(txHash Hash) (bool, *types.Receipt, error) { return ready, receipt, err } -func (s *Service) PollTransaction(txHash Hash) (bool, *types.Receipt, error) { +// scan the event stream for a claimSubmission event that matches claim. +// return this event and its successor +func (s *Service) FindClaimSubmissionEventAndSucc( + claim *claimRow, +) ( + *iconsensus.IConsensus, + *claimSubmissionEvent, + *claimSubmissionEvent, + error, +) { + ic, err := iconsensus.NewIConsensus(claim.AppIConsensusAddress, s.EthConn) + if err != nil { + return nil, nil, nil, err + } + + it, err := ic.FilterClaimSubmission(&bind.FilterOpts{ + Context: s.Context, + Start: claim.EpochLastBlock, + }, nil, nil) + if err != nil { + return nil, nil, nil, err + } + + for it.Next() { + event := it.Event + lastBlock := event.LastProcessedBlockNumber.Uint64() + if claimMatchesEvent(claim, event) { + var succ *claimSubmissionEvent = nil + if it.Next() { + succ = it.Event + } + if it.Error() != nil { + return nil, nil, nil, it.Error() + } + return ic, event, succ, nil + } else if lastBlock > claim.EpochLastBlock { + err = fmt.Errorf("claim not found, searched up to %v", event) + } + } + if it.Error() != nil { + return nil, nil, nil, it.Error() + } + return ic, nil, nil, nil +} + +/* poll a transaction hash for its submission status and receipt */ +func (s *Service) PollTransaction(txHash hash) (bool, *types.Receipt, error) { _, isPending, err := s.EthConn.TransactionByHash(s.Context, txHash) if err != nil || isPending { return false, nil, err diff --git a/internal/evmreader/config/config.go b/internal/evmreader/config/config.go new file mode 100644 index 000000000..fcf87a404 --- /dev/null +++ b/internal/evmreader/config/config.go @@ -0,0 +1,53 @@ +package config + +import ( + "log/slog" + + . "github.com/cartesi/rollups-node/internal/config" +) + +type EVMReaderConfig struct { + LogLevel slog.Level + LogPrettyEnabled bool + PostgresEndpoint Redacted[string] + BlockchainHttpEndpoint Redacted[string] + BlockchainWsEndpoint Redacted[string] + DefaultBlock DefaultBlock + RetryPolicyMaxRetries uint64 + RetryPolicyMaxDelay Duration + ContractsInputBoxAddress string + ContractsInputBoxDeploymentBlockNumber int64 + BlockchainID uint64 +} + +func GetEVMReaderConfig() EVMReaderConfig { + return EVMReaderConfig{ + LogLevel: GetLogLevel(), + LogPrettyEnabled: false, + PostgresEndpoint: Redacted[string]{GetPostgresEndpoint()}, + BlockchainHttpEndpoint:Redacted[string]{GetBlockchainHttpEndpoint()}, + BlockchainWsEndpoint: Redacted[string]{GetBlockchainWsEndpoint()}, + DefaultBlock: GetEvmReaderDefaultBlock(), + RetryPolicyMaxRetries: GetEvmReaderRetryPolicyMaxRetries(), + RetryPolicyMaxDelay: GetEvmReaderRetryPolicyMaxDelay(), + ContractsInputBoxAddress: GetContractsInputBoxAddress(), + ContractsInputBoxDeploymentBlockNumber: GetContractsInputBoxDeploymentBlockNumber(), + BlockchainID: GetBlockchainId(), + } +} + +func (c *EVMReaderConfig) ToNodeConfig() NodeConfig { + return NodeConfig { + LogLevel: c.LogLevel, + LogPrettyEnabled: c.LogPrettyEnabled, + PostgresEndpoint: c.PostgresEndpoint, + BlockchainHttpEndpoint: c.BlockchainHttpEndpoint, + BlockchainWsEndpoint: c.BlockchainWsEndpoint, + EvmReaderDefaultBlock: c.DefaultBlock, + EvmReaderRetryPolicyMaxRetries: c.RetryPolicyMaxRetries, + EvmReaderRetryPolicyMaxDelay: c.RetryPolicyMaxDelay, + ContractsInputBoxAddress: c.ContractsInputBoxAddress, + ContractsInputBoxDeploymentBlockNumber: c.ContractsInputBoxDeploymentBlockNumber, + BlockchainID: c.BlockchainID, + } +} diff --git a/internal/repository/claimer.go b/internal/repository/claimer.go index ccf99b836..caa36d072 100644 --- a/internal/repository/claimer.go +++ b/internal/repository/claimer.go @@ -17,22 +17,34 @@ var ( ErrNoUpdate = fmt.Errorf("update did not take effect") ) -type ComputedClaim struct { - Hash common.Hash +type ClaimRow struct { EpochID uint64 + EpochIndex uint64 + EpochFirstBlock uint64 + EpochLastBlock uint64 + EpochHash Hash AppContractAddress Address AppIConsensusAddress Address - EpochLastBlock uint64 } -func (pg *Database) SelectComputedClaims(ctx context.Context) ([]ComputedClaim, error) { +// Retrieve the computed claim of each application with the smallest index. +// The query may return either 0 or 1 entries per application. +func (pg *Database) SelectOldestComputedClaimPerApp(ctx context.Context) ( + map[Address]ClaimRow, + error, +) { + // NOTE(mpolitzer): DISTINCT ON is a postgres extension. To implement + // this in SQLite there is an alternative using GROUP BY and HAVING + // clauses instead. query := ` - SELECT + SELECT DISTINCT ON(application_address) epoch.id, + epoch.index, + epoch.first_block, + epoch.last_block, epoch.claim_hash, application.contract_address, - application.iconsensus_address, - epoch.last_block + application.iconsensus_address FROM epoch INNER JOIN @@ -40,9 +52,10 @@ func (pg *Database) SelectComputedClaims(ctx context.Context) ([]ComputedClaim, ON epoch.application_address = application.contract_address WHERE - epoch.status = @status + epoch.status=@status ORDER BY - epoch.application_address ASC, epoch.index ASC` + application_address, index ASC; + ` args := pgx.NamedArgs{ "status": EpochStatusClaimComputed, @@ -52,23 +65,102 @@ func (pg *Database) SelectComputedClaims(ctx context.Context) ([]ComputedClaim, return nil, err } - var data ComputedClaim + var data ClaimRow scans := []any{ &data.EpochID, - &data.Hash, + &data.EpochIndex, + &data.EpochFirstBlock, + &data.EpochLastBlock, + &data.EpochHash, &data.AppContractAddress, &data.AppIConsensusAddress, + } + + results := map[Address]ClaimRow{} + _, err = pgx.ForEachRow(rows, scans, func() error { + results[data.AppContractAddress] = data + return nil + }) + return results, err +} + +// Retrieve the newest accepted claim of each application +func (pg *Database) SelectNewestAcceptedClaimPerApp(ctx context.Context) ( + map[Address]ClaimRow, + error, +) { + query := ` + SELECT DISTINCT ON(application_address) + epoch.id, + epoch.index, + epoch.first_block, + epoch.last_block, + epoch.claim_hash, + application.contract_address, + application.iconsensus_address + FROM + epoch + INNER JOIN + application + ON + epoch.application_address = application.contract_address + WHERE + epoch.status=@status + ORDER BY + application_address, index DESC; + ` + + args := pgx.NamedArgs{ + "status": EpochStatusClaimAccepted, + } + rows, err := pg.db.Query(ctx, query, args) + if err != nil { + return nil, err + } + + var data ClaimRow + scans := []any{ + &data.EpochID, + &data.EpochIndex, + &data.EpochFirstBlock, &data.EpochLastBlock, + &data.EpochHash, + &data.AppContractAddress, + &data.AppIConsensusAddress, } - var results []ComputedClaim + results := map[Address]ClaimRow{} _, err = pgx.ForEachRow(rows, scans, func() error { - results = append(results, data) + results[data.AppContractAddress] = data return nil }) return results, err } +func (pg *Database) SelectClaimPairsPerApp(ctx context.Context) ( + map[Address]ClaimRow, + map[Address]ClaimRow, + error, +) { + tx, err := pg.db.Begin(ctx) + if err != nil { + return nil, nil, err + } + defer tx.Commit(ctx) + + computed, err := pg.SelectOldestComputedClaimPerApp(ctx) + if err != nil { + return nil, nil, err + } + + accepted, err := pg.SelectNewestAcceptedClaimPerApp(ctx) + if err != nil { + return nil, nil, err + } + + return computed, accepted, err +} + func (pg *Database) UpdateEpochWithSubmittedClaim( ctx context.Context, id uint64, diff --git a/internal/repository/claimer_test.go b/internal/repository/claimer_test.go index c907582f6..201bcf026 100644 --- a/internal/repository/claimer_test.go +++ b/internal/repository/claimer_test.go @@ -6,9 +6,11 @@ package repository import ( "context" "testing" + . "github.com/cartesi/rollups-node/internal/model" - "github.com/ethereum/go-ethereum/common" + // "github.com/cartesi/rollups-node/internal/repository" "github.com/cartesi/rollups-node/test/tooling/db" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" ) @@ -32,136 +34,228 @@ func setup(t *testing.T, ctx context.Context) (*require.Assertions, *Database, e func TestClaimerRepository(t *testing.T) { ctx := context.Background() - t.Run("EmptySelectComputedClaims", func(t *testing.T) { + // Must return an empty array for a database with no computed claims + t.Run("EmptyArrayOnCleanDB", func(t *testing.T) { require, database, err := setup(t, ctx) + require.Nil(err) - computedClaims, err := database.SelectComputedClaims(ctx) + computed, err := database.SelectOldestComputedClaimPerApp(ctx) require.Nil(err) - require.Empty(computedClaims) + require.Empty(computed) }) - t.Run("SelectComputedClaims", func(t *testing.T) { + // Check that we select the correct epochs on a "complex" situation. + // The query must return 0 or 1 entries per application with the smallest + // epoch index and status == 'COMPUTED_CLAIM'. + // + // Application 0 has 4 epochs, 1 already accepted (with lowest index) + // and 3 computed (the candidates to be selected). + // + // Application 1 has no computed epochs and must not be returned. + // + // Application 2 has 3 epochs, all computed. + // + // We expect 2 values on the array. + // 1) {index = 1, apps[0], ...} + // 2) {index = 2, apps[1], ...} + // + t.Run("MustRetrieveOldestComputedClaimForEachApp", func(t *testing.T) { require, database, err := setup(t, ctx) + require.Nil(err) - app := Application{ - Id: 1, - ContractAddress: common.HexToAddress("deadbeef"), - TemplateHash: common.HexToHash("deadbeef"), - LastProcessedBlock: 1, - Status: ApplicationStatusRunning, - IConsensusAddress: common.HexToAddress("ffffff"), + apps := []Application{ + { + Id: 0, + ContractAddress: common.HexToAddress("0"), + Status: ApplicationStatusRunning, + }, { + Id: 1, + ContractAddress: common.HexToAddress("1"), + Status: ApplicationStatusRunning, + }, { + Id: 2, + ContractAddress: common.HexToAddress("2"), + Status: ApplicationStatusRunning, + }, + } + for _, app := range(apps) { + _, err = database.InsertApplication(ctx, &app) + require.Nil(err) } - _, err = database.InsertApplication(ctx, &app) - require.Nil(err) - lastBlock := []uint64{99, 200} epochs := []Epoch{ + // epochs of apps[0] + { + Index: 3, // not this + AppAddress: apps[0].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimComputed, + }, { + Index: 2, // not this + AppAddress: apps[0].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimComputed, + }, { + Index: 1, // this! + AppAddress: apps[0].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimComputed, + }, { + Index: 0, // not this + AppAddress: apps[0].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimAccepted, + }, + + // epochs of apps[1] { - Id: 1, Index: 0, - FirstBlock: 0, - LastBlock: lastBlock[0], - AppAddress: app.ContractAddress, + AppAddress: apps[1].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimAccepted, + }, + + // epochs of apps[2] + { + Index: 3, // not this + AppAddress: apps[2].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimComputed, + }, { + Index: 2, // this! + AppAddress: apps[2].ContractAddress, ClaimHash: &common.Hash{}, TransactionHash: nil, Status: EpochStatusClaimComputed, - },{ - Id: 2, - Index: 1, - FirstBlock: lastBlock[0]+1, - LastBlock: lastBlock[1], - AppAddress: app.ContractAddress, + }, { + Index: 4, // not this + AppAddress: apps[2].ContractAddress, ClaimHash: &common.Hash{}, TransactionHash: nil, Status: EpochStatusClaimComputed, }, } - for _, epoch := range(epochs) { _, err = database.InsertEpoch(ctx, &epoch) require.Nil(err) } - computedClaims, err := database.SelectComputedClaims(ctx) + computed, err := database.SelectOldestComputedClaimPerApp(ctx) require.Nil(err) - require.Len(computedClaims, 2) + require.Len(computed, 2) - for i, computedClaim := range(computedClaims) { - require.Equal(computedClaim.EpochID, epochs[i].Id) - require.Equal(computedClaim.Hash, *epochs[i].ClaimHash) - require.Equal(computedClaim.AppContractAddress, app.ContractAddress) - require.Equal(computedClaim.AppIConsensusAddress, app.IConsensusAddress) - } + require.Equal(computed[apps[0].ContractAddress].EpochIndex, uint64(1)) + require.Equal(computed[apps[0].ContractAddress].AppContractAddress, + apps[0].ContractAddress) + + require.Equal(computed[apps[2].ContractAddress].EpochIndex, uint64(2)) + require.Equal(computed[apps[2].ContractAddress].AppContractAddress, + apps[2].ContractAddress) }) - t.Run("TestUpdateEpochWithSubmittedClaim", func(t *testing.T) { + t.Run("MustRetrieveNewestComputedClaimForEachApp", func(t *testing.T) { require, database, err := setup(t, ctx) - - app := Application{ - Id: 1, - ContractAddress: common.HexToAddress("deadbeef"), - TemplateHash: common.HexToHash("deadbeef"), - LastProcessedBlock: 1, - Status: ApplicationStatusRunning, - IConsensusAddress: common.HexToAddress("ffffff"), - } - _, err = database.InsertApplication(ctx, &app) require.Nil(err) - epoch := Epoch{ - Id: 1, - Index: 0, - FirstBlock: 0, - LastBlock: 100, - AppAddress: app.ContractAddress, - ClaimHash: &common.Hash{}, - TransactionHash: nil, - Status: EpochStatusClaimComputed, + apps := []Application{ + { + Id: 0, + ContractAddress: common.HexToAddress("0"), + Status: ApplicationStatusRunning, + }, { + Id: 1, + ContractAddress: common.HexToAddress("1"), + Status: ApplicationStatusRunning, + }, { + Id: 2, + ContractAddress: common.HexToAddress("2"), + Status: ApplicationStatusRunning, + }, + } + for _, app := range(apps) { + _, err = database.InsertApplication(ctx, &app) + require.Nil(err) } - id, err := database.InsertEpoch(ctx, &epoch) - require.Nil(err) - - transactionHash := common.HexToHash("0x10") - err = database.UpdateEpochWithSubmittedClaim(ctx, id, transactionHash) - require.Nil(err) - - updatedEpoch, err := database.GetEpoch(ctx, epoch.Index, epoch.AppAddress) - require.Nil(err) - require.Equal(updatedEpoch.Status, EpochStatusClaimSubmitted) - require.Equal(updatedEpoch.TransactionHash, &transactionHash) - }) + epochs := []Epoch{ + // epochs of apps[0] + { + Index: 3, // not this + AppAddress: apps[0].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimComputed, + }, { + Index: 2, // not this + AppAddress: apps[0].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimComputed, + }, { + Index: 1, // this! + AppAddress: apps[0].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimAccepted, + }, { + Index: 0, // not this + AppAddress: apps[0].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimAccepted, + }, - t.Run("TestFailUpdateEpochWithSubmittedClaim", func(t *testing.T) { - require, database, err := setup(t, ctx) + // epochs of apps[1] + { + Index: 0, // not this + AppAddress: apps[1].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimComputed, + }, - app := Application{ - Id: 1, - ContractAddress: common.HexToAddress("deadbeef"), - TemplateHash: common.HexToHash("deadbeef"), - LastProcessedBlock: 1, - Status: ApplicationStatusRunning, - IConsensusAddress: common.HexToAddress("ffffff"), + // epochs of apps[2] + { + Index: 3, // not this + AppAddress: apps[2].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimComputed, + }, { + Index: 2, // this! + AppAddress: apps[2].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimAccepted, + }, { + Index: 4, // not this + AppAddress: apps[2].ContractAddress, + ClaimHash: &common.Hash{}, + TransactionHash: nil, + Status: EpochStatusClaimComputed, + }, } - _, err = database.InsertApplication(ctx, &app) - require.Nil(err) - - transactionHash := common.HexToHash("0x10") - epoch := Epoch{ - Id: 1, - Index: 0, - FirstBlock: 0, - LastBlock: 100, - AppAddress: app.ContractAddress, - ClaimHash: &common.Hash{}, - TransactionHash: &transactionHash, - Status: EpochStatusClaimSubmitted, + for _, epoch := range(epochs) { + _, err = database.InsertEpoch(ctx, &epoch) + require.Nil(err) } - id, err := database.InsertEpoch(ctx, &epoch) + accepted, err := database.SelectNewestAcceptedClaimPerApp(ctx) require.Nil(err) + require.Len(accepted, 2) + + require.Equal(accepted[apps[0].ContractAddress].EpochIndex, uint64(1)) + require.Equal(accepted[apps[0].ContractAddress].AppContractAddress, + apps[0].ContractAddress) - err = database.UpdateEpochWithSubmittedClaim(ctx, id, transactionHash) - require.Equal(err, ErrNoUpdate) + require.Equal(accepted[apps[2].ContractAddress].EpochIndex, uint64(2)) + require.Equal(accepted[apps[2].ContractAddress].AppContractAddress, + apps[2].ContractAddress) }) } diff --git a/pkg/service/service.go b/pkg/service/service.go index 55f2abd46..68892efb1 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -240,7 +240,7 @@ func (s *Service) Reload() []error { errs := s.Impl.Reload() elapsed := time.Since(start) - if errs != nil { + if len(errs) > 0 { s.Logger.Error("Reload", "service", s.Name, "duration", elapsed, @@ -280,16 +280,16 @@ func (s *Service) Stop(force bool) []error { elapsed := time.Since(start) s.Running.Store(false) - if errs != nil { + if len(errs) > 0 { s.Logger.Error("Stop", - "force", force, "service", s.Name, + "force", force, "duration", elapsed, "error", errs) } else { s.Logger.Info("Stop", - "force", force, "service", s.Name, + "force", force, "duration", elapsed) } return nil