diff --git a/.gitignore b/.gitignore index c3e06ccc4..a6667a6b2 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ machine-snapshot/** /cartesi-rollups-node /cartesi-rollups-advancer /cartesi-rollups-validator +/cartesi-rollups-claimer diff --git a/Makefile b/Makefile index 53c1611bd..b5e18381f 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,7 @@ CLAIMER := cmd/authority-claimer/target/$(BUILD_TYPE)/cartesi-rollups-authority- RUST_ARTIFACTS := $(CLAIMER) # Go artifacts -GO_ARTIFACTS := cartesi-rollups-node cartesi-rollups-cli cartesi-rollups-evm-reader cartesi-rollups-advancer cartesi-rollups-validator +GO_ARTIFACTS := cartesi-rollups-node cartesi-rollups-cli cartesi-rollups-evm-reader cartesi-rollups-advancer cartesi-rollups-validator cartesi-rollups-claimer # fixme(vfusco): path on all oses CGO_CFLAGS:= -I$(PREFIX)/include diff --git a/cmd/cartesi-rollups-claimer/main.go b/cmd/cartesi-rollups-claimer/main.go new file mode 100644 index 000000000..6f3f3be58 --- /dev/null +++ b/cmd/cartesi-rollups-claimer/main.go @@ -0,0 +1,16 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package main + +import ( + "github.com/cartesi/rollups-node/cmd/cartesi-rollups-claimer/root" + "os" +) + +func main() { + err := root.Cmd.Execute() + if err != nil { + os.Exit(1) + } +} diff --git a/cmd/cartesi-rollups-claimer/root/root.go b/cmd/cartesi-rollups-claimer/root/root.go new file mode 100644 index 000000000..836726af5 --- /dev/null +++ b/cmd/cartesi-rollups-claimer/root/root.go @@ -0,0 +1,70 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package root + +import ( + "log/slog" + + "github.com/cartesi/rollups-node/internal/claimer" + "github.com/cartesi/rollups-node/internal/config" + "github.com/cartesi/rollups-node/pkg/service" + "github.com/spf13/cobra" +) + +var ( + // Should be overridden during the final release build with ldflags + // to contain the actual version number + buildVersion = "devel" + claimerService = claimer.Service{} + createInfo = claimer.CreateInfo{ + CreateInfo: service.CreateInfo{ + Name: "claimer", + ProcOwner: true, + EnableSignalHandling: true, + TelemetryCreate: true, + TelemetryAddress: ":8081", + Impl: &claimerService, + }, + } +) + +var Cmd = &cobra.Command{ + Use: createInfo.Name, + Short: "Runs " + createInfo.Name, + Long: "Runs " + createInfo.Name + " in standalone mode", + Run: run, +} + +func init() { + c := config.FromEnv() + createInfo.Auth = c.Auth + createInfo.BlockchainHttpEndpoint = c.BlockchainHttpEndpoint + createInfo.PostgresEndpoint = c.PostgresEndpoint + createInfo.PollInterval = c.ClaimerPollingInterval + createInfo.LogLevel = map[slog.Level]string{ + slog.LevelDebug: "debug", + slog.LevelInfo: "info", + slog.LevelWarn: "warn", + slog.LevelError: "error", + }[c.LogLevel] + + Cmd.Flags().StringVar(&createInfo.TelemetryAddress, + "telemetry-address", createInfo.TelemetryAddress, + "health check and metrics address and port") + Cmd.Flags().StringVar(&createInfo.BlockchainHttpEndpoint.Value, + "blockchain-http-endpoint", createInfo.BlockchainHttpEndpoint.Value, + "blockchain http endpoint") + Cmd.Flags().DurationVar(&createInfo.PollInterval, + "poll-interval", createInfo.PollInterval, + "poll interval") + Cmd.Flags().StringVar(&createInfo.LogLevel, + "log-level", createInfo.LogLevel, + "log level: debug, info, warn, error.") +} + +func run(cmd *cobra.Command, args []string) { + cobra.CheckErr(claimer.Create(createInfo, &claimerService)) + claimerService.CreateDefaultHandlers("/" + claimerService.Name) + cobra.CheckErr(claimerService.Serve()) +} diff --git a/internal/claimer/auth.go b/internal/claimer/auth.go new file mode 100644 index 000000000..a458bedb1 --- /dev/null +++ b/internal/claimer/auth.go @@ -0,0 +1,68 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package claimer + +import ( + "context" + "fmt" + + "github.com/cartesi/rollups-node/internal/config" + signtx "github.com/cartesi/rollups-node/internal/kms" + "github.com/cartesi/rollups-node/pkg/ethutil" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + + aws "github.com/aws/aws-sdk-go-v2/aws" + aws_cfg "github.com/aws/aws-sdk-go-v2/config" + aws_kms "github.com/aws/aws-sdk-go-v2/service/kms" +) + +var ( + ENoAuth = fmt.Errorf("error: unimplemented authentication method") +) + +func CreateSignerFromAuth( + auth config.Auth, + ctx context.Context, + client *ethclient.Client, +) ( + *bind.TransactOpts, + error, +) { + chainID, err := client.ChainID(ctx) + if err != nil { + return nil, err + } + + switch auth := auth.(type) { + case config.AuthPrivateKey: + key, err := crypto.HexToECDSA(auth.PrivateKey.Value) + if err != nil { + return nil, err + } + return bind.NewKeyedTransactorWithChainID(key, chainID) + case config.AuthMnemonic: + privateKey, err := ethutil.MnemonicToPrivateKey( + auth.Mnemonic.Value, uint32(auth.AccountIndex.Value)) + if err != nil { + return nil, err + } + return bind.NewKeyedTransactorWithChainID(privateKey, chainID) + case config.AuthAWS: + awsc, err := aws_cfg.LoadDefaultConfig(ctx) + if err != nil { + return nil, err + } + kms := aws_kms.NewFromConfig(awsc) + // TODO: option for an alternative endpoint + //kms := aws_kms.NewFromConfig(awsc, func(o *aws_kms.Options) { + // o.BaseEndpoint = aws.String(auth.EndpointURL.Value) + //}) + return signtx.CreateAWSTransactOpts(ctx, kms, + aws.String(auth.KeyID.Value), types.NewEIP155Signer(chainID)) + } + return nil, ENoAuth +} diff --git a/internal/claimer/claimer.go b/internal/claimer/claimer.go new file mode 100644 index 000000000..e60a3a1dd --- /dev/null +++ b/internal/claimer/claimer.go @@ -0,0 +1,204 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package claimer + +import ( + "context" + + . "github.com/cartesi/rollups-node/internal/config" + . "github.com/cartesi/rollups-node/internal/repository" + "github.com/cartesi/rollups-node/pkg/service" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + . "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" +) + +type CreateInfo struct { + service.CreateInfo + + Auth Auth + Signer *bind.TransactOpts + + BlockchainHttpEndpoint Redacted[string] + EthConn *ethclient.Client + + PostgresEndpoint Redacted[string] + DBConn *Database +} + +type Service struct { + service.Service + + DBConn *Database + EthConn *ethclient.Client + Signer *bind.TransactOpts + ClaimsInFlight map[Hash]Hash // claimHash -> txHash +} + +func Create(ci CreateInfo, s *Service) error { + var err error + + err = service.Create(&ci.CreateInfo, &s.Service) + if err != nil { + return err + } + + if s.EthConn == nil { + if ci.EthConn == nil { + ci.EthConn, err = ethclient.Dial(ci.BlockchainHttpEndpoint.Value) + if err != nil { + return err + } + } + s.EthConn = ci.EthConn + } + + if s.DBConn == nil { + if ci.DBConn == nil { + ci.DBConn, err = Connect(s.Context, ci.PostgresEndpoint.Value) + if err != nil { + return err + } + } + s.DBConn = ci.DBConn + } + + if s.ClaimsInFlight == nil { + s.ClaimsInFlight = map[Hash]Hash{} + } + + if s.Signer == nil { + if ci.Signer == nil { + ci.Signer, err = CreateSignerFromAuth(ci.Auth, s.Context, s.EthConn) + if err != nil { + return err + } + s.Signer = ci.Signer + } + } + + return err +} + +func (s *Service) Alive() bool { + return true +} + +func (s *Service) Ready() bool { + return true +} + +func (s *Service) Reload() []error { + return nil +} + +func (s *Service) Stop(bool) []error { + return nil +} + +func (s *Service) Tick() []error { + err := s.submitClaimsAndUpdateDatabase(s) + if err != nil { + return []error{err} + } + return nil +} + +func (s *Service) submitClaimsAndUpdateDatabase(se SideEffects) error { + claims, err := se.selectComputedClaims() + if err != nil { + return err + } + + claimFromHash := make(map[Hash]*ComputedClaim) + for i := 0; i < len(claims); i++ { + claimFromHash[claims[i].Hash] = &claims[i] + } + + // check claims in flight + for claimHash, txHash := range s.ClaimsInFlight { + ready, receipt, err := se.pollTransaction(txHash) + if err != nil { + return err + } + if !ready { + continue + } + + if claim, ok := claimFromHash[claimHash]; ok { + err = se.updateEpochWithSubmittedClaim( + s.DBConn, + s.Context, + claim, + receipt.TxHash) + if err != nil { + return err + } + delete(s.ClaimsInFlight, claimHash) + delete(claimFromHash, claimHash) + s.Logger.Info("claimer: Claim submitted", + "app", claim.AppContractAddress, + "claim", claimHash, + "last_block", claim.EpochLastBlock, + "tx", receipt.TxHash) + } + } + + // check event logs for the remaining claims, submit if not found + for i := 0; i < len(claims); i++ { + _, isSelected := claimFromHash[claims[i].Hash] + _, isInFlight := s.ClaimsInFlight[claims[i].Hash] + if !isSelected || isInFlight { + continue + } + + it, inst, err := se.enumerateSubmitClaimEventsSince( + s.EthConn, s.Context, + claims[i].AppIConsensusAddress, + claims[i].EpochLastBlock) + if err != nil { + return err + } + + for event, err := range it { + if err != nil { + return err + } + + hash := Hash(event.Claim) + claim, ok := claimFromHash[hash] + if ok { + err := se.updateEpochWithSubmittedClaim( + s.DBConn, + s.Context, + claim, + event.Raw.TxHash) + if err != nil { + return err + } + delete(claimFromHash, hash) + } + } + + // submit if not found in the logs (fetch from hash again, can be stale) + if claim, ok := claimFromHash[claims[i].Hash]; ok { + txHash, err := se.submitClaimToBlockchain(inst, s.Signer, claim) + if err != nil { + return err + } + s.ClaimsInFlight[claims[i].Hash] = txHash + delete(claimFromHash, claim.Hash) + } + } + return nil +} + +func (s *Service) Start(context context.Context, ready chan<- struct{}) error { + ready <- struct{}{} + return s.Serve() +} +func (s *Service) String() string { + return s.Name +} diff --git a/internal/claimer/claimer_test.go b/internal/claimer/claimer_test.go new file mode 100644 index 000000000..1f0e69304 --- /dev/null +++ b/internal/claimer/claimer_test.go @@ -0,0 +1,238 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package claimer + +import ( + "context" + "iter" + "log/slog" + "testing" + + . "github.com/cartesi/rollups-node/internal/repository" + "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + . "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +type ServiceMock struct { + mock.Mock + Service +} + +func (m *ServiceMock) submitClaimToBlockchain( + instance *iconsensus.IConsensus, + signer *bind.TransactOpts, + claim *ComputedClaim, +) (Hash, error) { + args := m.Called(nil, nil, claim) + return args.Get(0).(Hash), args.Error(1) +} + +func (m *ServiceMock) selectComputedClaims() ([]ComputedClaim, error) { + args := m.Called() + return args.Get(0).([]ComputedClaim), args.Error(1) +} + +func (m *ServiceMock) updateEpochWithSubmittedClaim( + dbConn *Database, + context context.Context, + claim *ComputedClaim, + txHash Hash, +) error { + args := m.Called(nil, nil, claim, txHash) + return args.Error(0) +} + +func (m *ServiceMock) enumerateSubmitClaimEventsSince( + ethConn *ethclient.Client, + context context.Context, + appIConsensusAddr Address, + epochLastBlock uint64, +) ( + iter.Seq2[*iconsensus.IConsensusClaimSubmission, error], + *iconsensus.IConsensus, + error, +) { + args := m.Called() + return args.Get(0).(iter.Seq2[*iconsensus.IConsensusClaimSubmission, error]), + args.Get(1).(*iconsensus.IConsensus), + args.Error(2) +} + +func (m *ServiceMock) pollTransaction(txHash Hash) (bool, *types.Receipt, error) { + args := m.Called(txHash) + return args.Bool(0), + args.Get(1).(*types.Receipt), + args.Error(2) +} + +func iteratorFromList[E any](slice []E) iter.Seq2[*E, error] { + return func(yield func(*E, error) bool) { + for _, element := range slice { + cont := yield(&element, nil) + if !cont { + return + } + } + } +} + +// ////////////////////////////////////////////////////////////////////////////// +// Test +// ////////////////////////////////////////////////////////////////////////////// + +// Do notghing when there are no claims to process +func TestEmptySelectComputedClaimsDoesNothing(t *testing.T) { + m := &ServiceMock{} + m.ClaimsInFlight = make(map[Hash]Hash) + + m.On("selectComputedClaims").Return([]ComputedClaim{}, nil) + + m.submitClaimsAndUpdateDatabase(m) + + m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 0) + m.AssertNumberOfCalls(t, "selectComputedClaims", 1) + m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 0) + m.AssertNumberOfCalls(t, "enumerateSubmitClaimEventsSince", 0) + m.AssertNumberOfCalls(t, "pollTransaction", 0) +} + +// Got a claim. +// Submit if new (by checking the event logs) +func TestSubmitNewClaim(t *testing.T) { + m := &ServiceMock{} + + newClaimHash := HexToHash("0x01") + newClaimTxHash := HexToHash("0x10") + newClaim := ComputedClaim{ + Hash: newClaimHash, + } + m.ClaimsInFlight = map[Hash]Hash{} + m.On("selectComputedClaims").Return([]ComputedClaim{ + newClaim, + }, nil) + m.On("submitClaimToBlockchain", nil, nil, &newClaim). + Return(newClaimTxHash, nil) + m.On("enumerateSubmitClaimEventsSince"). + Return(iteratorFromList([]iconsensus.IConsensusClaimSubmission{}), &iconsensus.IConsensus{}, nil) + m.On("pollTransaction", newClaimTxHash). + Return(false, &types.Receipt{}, nil) + assert.Equal(t, len(m.ClaimsInFlight), 0) + + m.submitClaimsAndUpdateDatabase(m) + + assert.Equal(t, len(m.ClaimsInFlight), 1) + m.AssertNumberOfCalls(t, "enumerateSubmitClaimEventsSince", 1) + m.AssertNumberOfCalls(t, "pollTransaction", 0) + m.AssertNumberOfCalls(t, "selectComputedClaims", 1) + m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 1) + m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 0) +} + +// Query the blockchain for the submitClaim transaction, it may not be ready yet +func TestClaimInFlightNotReadyDoesNothing(t *testing.T) { + m := &ServiceMock{} + + claimInFlightHash := HexToHash("0x01") + claimInFlightTxHash := HexToHash("0x10") + claimInFlight := ComputedClaim{ + Hash: claimInFlightHash, + } + m.ClaimsInFlight = map[Hash]Hash{ + claimInFlightHash: claimInFlightTxHash, + } + m.On("selectComputedClaims").Return([]ComputedClaim{ + claimInFlight, + }, nil) + m.On("pollTransaction", claimInFlightTxHash). + Return(false, &types.Receipt{}, nil) + assert.Equal(t, len(m.ClaimsInFlight), 1) + + m.submitClaimsAndUpdateDatabase(m) + + assert.Equal(t, len(m.ClaimsInFlight), 1) + m.AssertNumberOfCalls(t, "enumerateSubmitClaimEventsSince", 0) + m.AssertNumberOfCalls(t, "pollTransaction", 1) + m.AssertNumberOfCalls(t, "selectComputedClaims", 1) + m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 0) + m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 0) +} + +// Update ClaimsInFlight and the database when a submitClaim transaction is completed +func TestUpdateClaimInFlightViaPollTransaction(t *testing.T) { + m := &ServiceMock{} + m.Logger = slog.Default() + + claimInFlightHash := HexToHash("0x01") + claimInFlightTxHash := HexToHash("0x10") + claimInFlight := ComputedClaim{ + Hash: claimInFlightHash, + } + m.ClaimsInFlight = map[Hash]Hash{ + claimInFlightHash: claimInFlightTxHash, + } + m.On("selectComputedClaims").Return([]ComputedClaim{ + claimInFlight, + }, nil) + m.On("pollTransaction", claimInFlightTxHash). + Return(true, &types.Receipt{TxHash: claimInFlightTxHash}, nil) + m.On("updateEpochWithSubmittedClaim", + nil, nil, &claimInFlight, claimInFlightTxHash).Return(nil) + assert.Equal(t, len(m.ClaimsInFlight), 1) + + m.submitClaimsAndUpdateDatabase(m) + + assert.Equal(t, len(m.ClaimsInFlight), 0) + m.AssertNumberOfCalls(t, "enumerateSubmitClaimEventsSince", 0) + m.AssertNumberOfCalls(t, "pollTransaction", 1) + m.AssertNumberOfCalls(t, "selectComputedClaims", 1) + m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 0) + m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 1) +} + +// This shouldn't happen normally, +// but a submitClaim transaction may be on the blockchain but the database not know it. +// The blockchain is the source of truth in any case. +// So search for the transaction in the event logs. +// And if found, update the database. +func TestUpdateClaimViaEventLog(t *testing.T) { + m := &ServiceMock{} + + existingClaimHash := HexToHash("0x01") + existingClaimTxHash := HexToHash("0x10") + existingClaim := ComputedClaim{ + Hash: existingClaimHash, + } + m.ClaimsInFlight = map[Hash]Hash{} + m.On("selectComputedClaims").Return([]ComputedClaim{ + existingClaim, + }, nil) + m.On("updateEpochWithSubmittedClaim", + nil, nil, &existingClaim, existingClaimTxHash).Return(nil) + m.On("enumerateSubmitClaimEventsSince"). + Return(iteratorFromList([]iconsensus.IConsensusClaimSubmission{ + { + Claim: existingClaim.Hash, + Raw: types.Log{ + TxHash: existingClaimTxHash, + }, + }, + }), &iconsensus.IConsensus{}, nil) + assert.Equal(t, len(m.ClaimsInFlight), 0) + + m.submitClaimsAndUpdateDatabase(m) + + assert.Equal(t, len(m.ClaimsInFlight), 0) + m.AssertNumberOfCalls(t, "enumerateSubmitClaimEventsSince", 1) + m.AssertNumberOfCalls(t, "pollTransaction", 0) + m.AssertNumberOfCalls(t, "selectComputedClaims", 1) + m.AssertNumberOfCalls(t, "submitClaimToBlockchain", 0) + m.AssertNumberOfCalls(t, "updateEpochWithSubmittedClaim", 1) +} diff --git a/internal/claimer/side-effects.go b/internal/claimer/side-effects.go new file mode 100644 index 000000000..8c7ab1bc1 --- /dev/null +++ b/internal/claimer/side-effects.go @@ -0,0 +1,216 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package claimer + +import ( + "context" + "iter" + "math/big" + + //. "github.com/cartesi/rollups-node/internal/config" + . "github.com/cartesi/rollups-node/internal/repository" + "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + . "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" +) + +type SideEffects interface { + submitClaimToBlockchain( + instance *iconsensus.IConsensus, + signer *bind.TransactOpts, + claim *ComputedClaim, + ) (Hash, error) + + selectComputedClaims() ([]ComputedClaim, error) + + updateEpochWithSubmittedClaim( + DBConn *Database, + context context.Context, + claim *ComputedClaim, + txHash Hash, + ) error + + enumerateSubmitClaimEventsSince( + EthConn *ethclient.Client, + context context.Context, + appIConsensusAddr Address, + epochLastBlock uint64, + ) ( + iter.Seq2[*iconsensus.IConsensusClaimSubmission, error], + *iconsensus.IConsensus, + error) + + pollTransaction(txHash Hash) (bool, *types.Receipt, error) +} + +func (s *Service) submitClaimToBlockchain( + instance *iconsensus.IConsensus, + signer *bind.TransactOpts, + claim *ComputedClaim, +) (Hash, error) { + txHash := Hash{} + lastBlockNumber := new(big.Int).SetUint64(claim.EpochLastBlock) + tx, err := instance.SubmitClaim(signer, claim.AppContractAddress, + lastBlockNumber, claim.Hash) + if err != nil { + s.Logger.Error("submitClaimToBlockchain:failed", + "service", s.Name, + "appContractAddress", claim.AppContractAddress, + "claimHash", claim.Hash, + "txHash", tx.Hash(), + "error", err) + } else { + txHash = tx.Hash() + s.Logger.Debug("SubmitClaimToBlockchain:success", + "service", s.Name, + "appContractAddress", claim.AppContractAddress, + "claimHash", claim.Hash, + "TxHash", txHash) + } + return txHash, err +} + +func (s *Service) selectComputedClaims() ([]ComputedClaim, error) { + claims, err := s.DBConn.SelectComputedClaims(s.Context) + if err != nil { + s.Logger.Error("SelectComputedClaims:failed", + "service", s.Name, + "error", err) + } else { + var ids []uint64 + for _, claim := range claims { + ids = append(ids, claim.EpochID) + } + s.Logger.Debug("SelectComputedClaims:success", + "service", s.Name, + "claims", len(claims), + "ids", ids, + "inFlight", len(s.ClaimsInFlight)) + } + return claims, err +} + +/* update the database epoch status to CLAIM_SUBMITTED and add a transaction hash */ +func (s *Service) updateEpochWithSubmittedClaim( + DBConn *Database, + context context.Context, + claim *ComputedClaim, + txHash Hash, +) error { + _, err := DBConn.UpdateEpochWithSubmittedClaim(context, claim.EpochID, txHash) + if err != nil { + s.Logger.Error("UpdateEpochWithSubmittedClaim:failed", + "service", s.Name, + "appContractAddress", claim.AppContractAddress, + "hash", claim.Hash, + "txHash", txHash, + "error", err) + } else { + s.Logger.Debug("UpdateEpochWithSubmittedClaim:success", + "service", s.Name, + "appContractAddress", claim.AppContractAddress, + "hash", claim.Hash, + "txHash", txHash) + } + return err +} + +func (s *Service) enumerateSubmitClaimEventsSince( + EthConn *ethclient.Client, + context context.Context, + appIConsensusAddr Address, + epochLastBlock uint64, +) ( + iter.Seq2[*iconsensus.IConsensusClaimSubmission, error], + *iconsensus.IConsensus, + error, +) { + it, ic, err := s.EnumerateSubmitClaimEventsSince( + EthConn, context, appIConsensusAddr, epochLastBlock) + + if err != nil { + s.Logger.Error("EnumerateSubmitClaimEventsSince:failed", + "service", s.Name, + "appIConsensusAddr", appIConsensusAddr, + "epochLastBlock", epochLastBlock, + "error", err) + } else { + s.Logger.Debug("EnumerateSubmitClaimEventsSince:success", + "service", s.Name, + "appIConsensusAddr", appIConsensusAddr, + "epochLastBlock", epochLastBlock) + } + return it, ic, err +} + +func (s *Service) EnumerateSubmitClaimEventsSince( + EthConn *ethclient.Client, + context context.Context, + appIConsensusAddr Address, + epochLastBlock uint64, +) ( + iter.Seq2[*iconsensus.IConsensusClaimSubmission, error], + *iconsensus.IConsensus, + error, +) { + ic, err := iconsensus.NewIConsensus(appIConsensusAddr, EthConn) + if err != nil { + return nil, nil, err + } + + it, err := ic.FilterClaimSubmission(&bind.FilterOpts{ + Context: context, + Start: epochLastBlock, + }, nil, nil) + + // make an iterator for the events + return func(yield func(*iconsensus.IConsensusClaimSubmission, error) bool) { + if !it.Next() { + return + } + cont := yield(it.Event, it.Error()) + if !cont { + return + } + }, ic, nil +} + +func (s *Service) pollTransaction(txHash Hash) (bool, *types.Receipt, error) { + ready, receipt, err := s.PollTransaction(txHash) + if err != nil { + s.Logger.Error("PollTransaction:failed", + "service", s.Name, + "tx", txHash, + "error", err) + } else if ready { + s.Logger.Debug("PollTransaction:success", + "service", s.Name, + "tx", txHash, + "ready", ready, + "blockNumber", receipt.BlockNumber) + } else { + s.Logger.Debug("PollTransaction:pending", + "service", s.Name, + "tx", txHash, + "ready", ready) + } + return ready, receipt, err +} + +func (s *Service) PollTransaction(txHash Hash) (bool, *types.Receipt, error) { + _, isPending, err := s.EthConn.TransactionByHash(s.Context, txHash) + if err != nil || isPending { + return false, nil, err + } + + receipt, err := s.EthConn.TransactionReceipt(s.Context, txHash) + if err != nil { + return false, nil, err + } + + return receipt.Status == 1, receipt, err +} diff --git a/internal/node/services.go b/internal/node/services.go index 8c5286cb5..4d98ed26d 100644 --- a/internal/node/services.go +++ b/internal/node/services.go @@ -10,11 +10,13 @@ import ( "os" advancerservice "github.com/cartesi/rollups-node/internal/advancer/service" + claimerservice "github.com/cartesi/rollups-node/internal/claimer" "github.com/cartesi/rollups-node/internal/config" evmreaderservice "github.com/cartesi/rollups-node/internal/evmreader/service" "github.com/cartesi/rollups-node/internal/repository" "github.com/cartesi/rollups-node/internal/services" "github.com/cartesi/rollups-node/internal/validator" + "github.com/cartesi/rollups-node/pkg/service" ) // We use an enum to define the ports of each service and avoid conflicts. @@ -107,6 +109,7 @@ func newSupervisorService( s = append(s, newAdvancerService(c, database, serveMux)) s = append(s, newValidatorService(c, database)) s = append(s, newHttpService(c, serveMux)) + s = append(s, newClaimerService(c, database)) supervisor := services.SupervisorService{ Name: "rollups-node", @@ -150,3 +153,33 @@ func newValidatorService(c config.NodeConfig, database *repository.Database) ser c.ValidatorPollingInterval, ) } + +func newClaimerService(c config.NodeConfig, database *repository.Database) services.Service { + claimerService := claimerservice.Service{} + createInfo := claimerservice.CreateInfo{ + Auth: c.Auth, + DBConn: database, + PostgresEndpoint: c.PostgresEndpoint, + BlockchainHttpEndpoint: c.BlockchainHttpEndpoint, + CreateInfo: service.CreateInfo{ + Name: "claimer", + PollInterval: c.ClaimerPollingInterval, + Impl: &claimerService, + ProcOwner: true, // TODO: Remove this after updating supervisor + LogLevel: map[slog.Level]string{ // reverse it to string + slog.LevelDebug: "debug", + slog.LevelInfo: "info", + slog.LevelWarn: "warn", + slog.LevelError: "error", + }[c.LogLevel], + }, + } + + err := claimerservice.Create(createInfo, &claimerService) + if err != nil { + claimerService.Logger.Error("Fatal", + "service", claimerService.Name, + "error", err) + } + return &claimerService +} diff --git a/internal/repository/claimer.go b/internal/repository/claimer.go new file mode 100644 index 000000000..093f3c668 --- /dev/null +++ b/internal/repository/claimer.go @@ -0,0 +1,89 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package repository + +import ( + "context" + + . "github.com/cartesi/rollups-node/internal/model" + "github.com/ethereum/go-ethereum/common" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type ComputedClaim struct { + Hash common.Hash + EpochID uint64 + AppContractAddress Address + AppIConsensusAddress Address + EpochLastBlock uint64 +} + +func (pg *Database) SelectComputedClaims(ctx context.Context) ([]ComputedClaim, error) { + query := ` + SELECT + epoch.id, + epoch.claim_hash, + application.contract_address, + application.iconsensus_address, + epoch.last_block + FROM + epoch + INNER JOIN + application + ON + epoch.application_address = application.contract_address + WHERE + epoch.status = @status + ORDER BY + epoch.application_address ASC, epoch.index ASC` + + args := pgx.NamedArgs{ + "status": EpochStatusClaimComputed, + } + rows, err := pg.db.Query(ctx, query, args) + if err != nil { + return nil, err + } + + var data ComputedClaim + scans := []any{ + &data.EpochID, + &data.Hash, + &data.AppContractAddress, + &data.AppIConsensusAddress, + &data.EpochLastBlock, + } + + var results []ComputedClaim + _, err = pgx.ForEachRow(rows, scans, func() error { + results = append(results, data) + return nil + }) + return results, err +} + +func (pg *Database) UpdateEpochWithSubmittedClaim( + ctx context.Context, + id uint64, + transaction_hash common.Hash, +) (pgconn.CommandTag, error) { + query := ` + UPDATE + epoch + SET + status = @status, + transaction_hash = @transaction_hash + WHERE + status = @prevStatus AND epoch.id = @id` + + args := pgx.NamedArgs{ + "id": id, + "transaction_hash": transaction_hash, + "status": EpochStatusClaimSubmitted, + "prevStatus": EpochStatusClaimComputed, + } + return pg.db.Exec(ctx, query, args) +}