diff --git a/cmd/cartesi-rollups-evm-reader/root/root.go b/cmd/cartesi-rollups-evm-reader/root/root.go index 7d7f43e28..c3fa96dc4 100644 --- a/cmd/cartesi-rollups-evm-reader/root/root.go +++ b/cmd/cartesi-rollups-evm-reader/root/root.go @@ -4,7 +4,9 @@ package root import ( + "github.com/cartesi/rollups-node/internal/config" "github.com/cartesi/rollups-node/internal/evmreader" + "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/pkg/service" "github.com/spf13/cobra" @@ -24,8 +26,11 @@ var ( TelemetryAddress: ":10000", Impl: &readerService, }, - DefaultBlockString: "safe", + EvmReaderPersistentConfig: model.EvmReaderPersistentConfig{ + DefaultBlock: model.DefaultBlockStatusSafe, + }, } + DefaultBlockString = "safe" ) var Cmd = &cobra.Command{ @@ -38,8 +43,8 @@ var Cmd = &cobra.Command{ func init() { createInfo.LoadEnv() - Cmd.Flags().StringVarP(&createInfo.DefaultBlockString, - "default-block", "d", createInfo.DefaultBlockString, + Cmd.Flags().StringVarP(&DefaultBlockString, + "default-block", "d", DefaultBlockString, `Default block to be used when fetching new blocks. One of 'latest', 'safe', 'pending', 'finalized'`) @@ -78,6 +83,12 @@ func init() { } func run(cmd *cobra.Command, args []string) { + if cmd.Flags().Changed("default-block") { + var err error + createInfo.DefaultBlock, err = config.ToDefaultBlockFromString(DefaultBlockString) + cobra.CheckErr(err) + } + ready := make(chan struct{}, 1) cobra.CheckErr(evmreader.Create(&createInfo, &readerService)) readerService.CreateDefaultHandlers("/" + readerService.Name) diff --git a/internal/advancer/advancer.go b/internal/advancer/advancer.go index e205ba28a..743f810d3 100644 --- a/internal/advancer/advancer.go +++ b/internal/advancer/advancer.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "log/slog" "net/http" "time" @@ -43,25 +42,11 @@ type IAdvancerMachines interface { Apps() []Address } -type Advancer struct { - repository IAdvancerRepository - machines IAdvancerMachines -} - type Service struct { service.Service - Advancer - inspector *inspect.Inspector -} - -func New(machines IAdvancerMachines, repository IAdvancerRepository) (*Advancer, error) { - if machines == nil { - return nil, ErrInvalidMachines - } - if repository == nil { - return nil, ErrInvalidRepository - } - return &Advancer{machines: machines, repository: repository}, nil + repository IAdvancerRepository + machines IAdvancerMachines + inspector inspect.Inspector } type CreateInfo struct { @@ -92,29 +77,41 @@ func Create(c *CreateInfo, s *Service) error { return err } - if c.Repository == nil { - c.Repository, err = repository.Connect(s.Context, c.PostgresEndpoint.Value) - if err != nil { - return err + if s.repository == nil { + if c.Repository == nil { + c.Repository, err = repository.Connect(s.Context, c.PostgresEndpoint.Value) + if err != nil { + return err + } } + s.repository = c.Repository } - s.repository = c.Repository - if c.Machines == nil { - c.Machines, err = machines.Load(s.Context, c.Repository, c.MachineServerVerbosity.Value) - if err != nil { - return err + if s.machines == nil { + if c.Machines == nil { + c.Machines, err = machines.Load(s.Context, + c.Repository, c.MachineServerVerbosity.Value, s.Logger) + if err != nil { + return err + } } + s.machines = c.Machines } - s.machines = c.Machines - if s.Service.ServeMux == nil { - if c.CreateInfo.ServeMux == nil { - c.ServeMux = http.NewServeMux() + // allow partial construction for testing + if c.Machines != nil { + s.inspector = inspect.Inspector{ + IInspectMachines: c.Machines, + } + if s.Service.ServeMux == nil { + if c.CreateInfo.ServeMux == nil { + c.ServeMux = http.NewServeMux() + } + s.ServeMux = c.ServeMux } + s.ServeMux.Handle("/inspect/{dapp}", http.Handler(&s.inspector)) + s.ServeMux.Handle("/inspect/{dapp}/{payload}", http.Handler(&s.inspector)) } - s.Service.ServeMux.Handle("/inspect/{dapp}", http.Handler(s.inspector)) - s.Service.ServeMux.Handle("/inspect/{dapp}/{payload}", http.Handler(s.inspector)) return nil } @@ -144,7 +141,7 @@ func (v *Service) String() string { // It gets unprocessed inputs from the repository, // runs them through the cartesi machine, // and updates the repository with the outputs. -func (advancer *Advancer) Step(ctx context.Context) error { +func (advancer *Service) Step(ctx context.Context) error { // Dynamically updates the list of machines err := advancer.machines.UpdateMachines(ctx) if err != nil { @@ -154,7 +151,7 @@ func (advancer *Advancer) Step(ctx context.Context) error { apps := advancer.machines.Apps() // Gets the unprocessed inputs (of all apps) from the repository. - slog.Debug("advancer: querying for unprocessed inputs") + advancer.Logger.Debug("querying for unprocessed inputs") inputs, err := advancer.repository.GetUnprocessedInputs(ctx, apps) if err != nil { return err @@ -162,7 +159,7 @@ func (advancer *Advancer) Step(ctx context.Context) error { // Processes each set of inputs. for app, inputs := range inputs { - slog.Debug(fmt.Sprintf("advancer: processing %d input(s) from %v", len(inputs), app)) + advancer.Logger.Debug(fmt.Sprintf("processing %d input(s) from %v", len(inputs), app)) err := advancer.process(ctx, app, inputs) if err != nil { return err @@ -181,7 +178,7 @@ func (advancer *Advancer) Step(ctx context.Context) error { } // process sequentially processes inputs from the the application. -func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*Input) error { +func (advancer *Service) process(ctx context.Context, app Address, inputs []*Input) error { // Asserts that the app has an associated machine. machine, exists := advancer.machines.GetAdvanceMachine(app) if !exists { @@ -195,7 +192,7 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In // FIXME if theres a change in epoch id call update epochs for _, input := range inputs { - slog.Info("advancer: Processing input", "app", app, "id", input.Id, "index", input.Index) + advancer.Logger.Info("Processing input", "app", app, "id", input.Id, "index", input.Index) // Sends the input to the cartesi machine. res, err := machine.Advance(ctx, input.RawData, input.Index) diff --git a/internal/advancer/advancer_test.go b/internal/advancer/advancer_test.go index acc27b539..98c393d8d 100644 --- a/internal/advancer/advancer_test.go +++ b/internal/advancer/advancer_test.go @@ -15,6 +15,7 @@ import ( "github.com/cartesi/rollups-node/internal/advancer/machines" . "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/internal/nodemachine" + "github.com/cartesi/rollups-node/pkg/service" "github.com/stretchr/testify/suite" ) @@ -25,41 +26,16 @@ func TestAdvancer(t *testing.T) { type AdvancerSuite struct{ suite.Suite } -func (s *AdvancerSuite) TestNew() { - s.Run("Ok", func() { - require := s.Require() - machines := newMockMachines() - machines.Map[randomAddress()] = &MockMachine{} - var repository IAdvancerRepository = &MockRepository{} - advancer, err := New(machines, repository) - require.NotNil(advancer) - require.Nil(err) - }) - - s.Run("InvalidMachines", func() { - require := s.Require() - var machines IAdvancerMachines = nil - var repository IAdvancerRepository = &MockRepository{} - advancer, err := New(machines, repository) - require.Nil(advancer) - require.Error(err) - require.Equal(ErrInvalidMachines, err) - }) - - s.Run("InvalidRepository", func() { - require := s.Require() - machines := newMockMachines() - machines.Map[randomAddress()] = &MockMachine{} - var repository IAdvancerRepository = nil - advancer, err := New(machines, repository) - require.Nil(advancer) - require.Error(err) - require.Equal(ErrInvalidRepository, err) - }) -} - -func (s *AdvancerSuite) TestPoller() { - s.T().Skip("TODO") +func New(m IAdvancerMachines, r IAdvancerRepository) (*Service, error) { + s := &Service{ + machines: m, + repository: r, + } + return s, Create(&CreateInfo{ + CreateInfo: service.CreateInfo{ + Name: "advancer", + }, + }, s) } func (s *AdvancerSuite) TestRun() { @@ -105,15 +81,15 @@ func (s *AdvancerSuite) TestRun() { } func (s *AdvancerSuite) TestProcess() { - setup := func() (IAdvancerMachines, *MockRepository, *Advancer, Address) { + setup := func() (IAdvancerMachines, *MockRepository, *Service, Address) { + require := s.Require() + app := randomAddress() machines := newMockMachines() machines.Map[app] = &MockMachine{} repository := &MockRepository{} - advancer := &Advancer{ - machines: machines, - repository: repository, - } + advancer, err := New(machines, repository) + require.Nil(err) return machines, repository, advancer, app } diff --git a/internal/advancer/machines/machines.go b/internal/advancer/machines/machines.go index 97eafedb5..14e05f649 100644 --- a/internal/advancer/machines/machines.go +++ b/internal/advancer/machines/machines.go @@ -45,6 +45,7 @@ type Machines struct { machines map[Address]*nm.NodeMachine repository Repository verbosity cm.ServerVerbosity + Logger *slog.Logger } // Load initializes the cartesi machines. @@ -52,7 +53,12 @@ type Machines struct { // // Load does not fail when one of those machines fail to initialize. // It stores the error to be returned later and continues to initialize the other machines. -func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (*Machines, error) { +func Load( + ctx context.Context, + repo Repository, + verbosity cm.ServerVerbosity, + logger *slog.Logger, +) (*Machines, error) { configs, err := repo.GetMachineConfigurations(ctx) if err != nil { return nil, err @@ -63,7 +69,7 @@ func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (* for _, config := range configs { // Creates the machine. - machine, err := createMachine(ctx, verbosity, config) + machine, err := createMachine(ctx, verbosity, config, logger) if err != nil { err = fmt.Errorf("failed to create machine from snapshot (%v): %w", config, err) errs = errors.Join(errs, err) @@ -71,7 +77,7 @@ func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (* } // Advances the machine until it catches up with the state of the database (if necessary). - err = catchUp(ctx, repo, config.AppAddress, machine, config.ProcessedInputs) + err = catchUp(ctx, repo, config.AppAddress, machine, config.ProcessedInputs, logger) if err != nil { err = fmt.Errorf("failed to advance cartesi machine (%v): %w", config, err) errs = errors.Join(errs, err, machine.Close()) @@ -81,7 +87,12 @@ func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (* machines[config.AppAddress] = machine } - return &Machines{machines: machines, repository: repo, verbosity: verbosity}, errs + return &Machines{ + machines: machines, + repository: repo, + verbosity: verbosity, + Logger: logger, + }, errs } func (m *Machines) UpdateMachines(ctx context.Context) error { @@ -95,15 +106,15 @@ func (m *Machines) UpdateMachines(ctx context.Context) error { continue } - machine, err := createMachine(ctx, m.verbosity, config) + machine, err := createMachine(ctx, m.verbosity, config, m.Logger) if err != nil { - slog.Error("advancer: Failed to create machine", "app", config.AppAddress, "error", err) + m.Logger.Error("Failed to create machine", "app", config.AppAddress, "error", err) continue } - err = catchUp(ctx, m.repository, config.AppAddress, machine, config.ProcessedInputs) + err = catchUp(ctx, m.repository, config.AppAddress, machine, config.ProcessedInputs, m.Logger) if err != nil { - slog.Error("Failed to sync the machine", "app", config.AppAddress, "error", err) + m.Logger.Error("Failed to sync the machine", "app", config.AppAddress, "error", err) machine.Close() continue } @@ -158,7 +169,7 @@ func (m *Machines) RemoveAbsent(configs []*MachineConfig) { } for address, machine := range m.machines { if !configMap[address] { - slog.Info("advancer: Application was disabled, shutting down machine", "application", address) + m.Logger.Info("Application was disabled, shutting down machine", "application", address) machine.Close() delete(m.machines, address) } @@ -200,7 +211,7 @@ func (m *Machines) Close() error { err := closeMachines(m.machines) if err != nil { - slog.Error(fmt.Sprintf("failed to close some machines: %v", err)) + m.Logger.Error(fmt.Sprintf("failed to close some machines: %v", err)) } return err } @@ -227,17 +238,18 @@ func closeMachines(machines map[Address]*nm.NodeMachine) (err error) { func createMachine(ctx context.Context, verbosity cm.ServerVerbosity, config *MachineConfig, + logger *slog.Logger, ) (*nm.NodeMachine, error) { - slog.Info("advancer: creating machine", "application", config.AppAddress, + logger.Info("creating machine", "application", config.AppAddress, "template-path", config.SnapshotPath) - slog.Debug("advancer: instantiating remote machine server", "application", config.AppAddress) + logger.Debug("instantiating remote machine server", "application", config.AppAddress) // Starts the server. address, err := cm.StartServer(verbosity, 0, os.Stdout, os.Stderr) if err != nil { return nil, err } - slog.Info("advancer: loading machine on server", "application", config.AppAddress, + logger.Info("loading machine on server", "application", config.AppAddress, "remote-machine", address, "template-path", config.SnapshotPath) // Creates a CartesiMachine from the snapshot. runtimeConfig := &emulator.MachineRuntimeConfig{} @@ -246,7 +258,7 @@ func createMachine(ctx context.Context, return nil, errors.Join(err, cm.StopServer(address)) } - slog.Debug("advancer: machine loaded on server", "application", config.AppAddress, + logger.Debug("machine loaded on server", "application", config.AppAddress, "remote-machine", address, "template-path", config.SnapshotPath) // Creates a RollupsMachine from the CartesiMachine. @@ -276,9 +288,10 @@ func catchUp(ctx context.Context, app Address, machine *nm.NodeMachine, processedInputs uint64, + logger *slog.Logger, ) error { - slog.Info("advancer: catching up unprocessed inputs", "app", app) + logger.Info("catching up unprocessed inputs", "app", app) inputs, err := repo.GetProcessedInputs(ctx, app, processedInputs) if err != nil { @@ -287,7 +300,7 @@ func catchUp(ctx context.Context, for _, input := range inputs { // FIXME epoch id to epoch index - slog.Info("advancer: advancing", "app", app, "epochId", input.EpochId, + logger.Info("advancing", "app", app, "epochId", input.EpochId, "input_index", input.Index) _, err := machine.Advance(ctx, input.RawData, input.Index) if err != nil { diff --git a/internal/claimer/claimer.go b/internal/claimer/claimer.go index af5bf45a9..0bf115647 100644 --- a/internal/claimer/claimer.go +++ b/internal/claimer/claimer.go @@ -71,7 +71,7 @@ type CreateInfo struct { EthConn *ethclient.Client PostgresEndpoint config.Redacted[string] - DBConn *repository.Database + Repository *repository.Database EnableSubmission bool } @@ -80,7 +80,7 @@ type Service struct { service.Service submissionEnabled bool - DBConn *repository.Database + Repository *repository.Database EthConn *ethclient.Client TxOpts *bind.TransactOpts claimsInFlight map[address]hash // -> txHash @@ -93,6 +93,8 @@ func (c *CreateInfo) LoadEnv() { } c.BlockchainHttpEndpoint.Value = config.GetBlockchainHttpEndpoint() c.PostgresEndpoint.Value = config.GetPostgresEndpoint() + c.PollInterval = config.GetClaimerPollingInterval() + c.LogLevel = service.LogLevel(config.GetLogLevel()) } func Create(ci CreateInfo, s *Service) error { @@ -114,14 +116,14 @@ func Create(ci CreateInfo, s *Service) error { s.EthConn = ci.EthConn } - if s.DBConn == nil { - if ci.DBConn == nil { - ci.DBConn, err = repository.Connect(s.Context, ci.PostgresEndpoint.Value) + if s.Repository == nil { + if ci.Repository == nil { + ci.Repository, err = repository.Connect(s.Context, ci.PostgresEndpoint.Value) if err != nil { return err } } - s.DBConn = ci.DBConn + s.Repository = ci.Repository } if s.claimsInFlight == nil { @@ -182,14 +184,14 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { errs = append(errs, err) return errs } - s.Logger.Info("claimer: Claim submitted", + s.Logger.Info("Claim submitted", "app", claim.AppContractAddress, "claim", claim.EpochHash, "last_block", claim.EpochLastBlock, "tx", txHash) delete(currClaims, key) } else { - s.Logger.Warn("claimer: expected claim in flight to be in currClaims.", + s.Logger.Warn("expected claim in flight to be in currClaims.", "tx", receipt.TxHash) } delete(s.claimsInFlight, key) @@ -208,7 +210,7 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { if prevClaimRow, ok := prevClaims[key]; ok { err := checkClaimsConstraint(&prevClaimRow, &currClaimRow) if err != nil { - s.Logger.Error("claimer: database mismatch", + s.Logger.Error("database mismatch", "prevClaim", prevClaimRow, "currClaim", currClaimRow, "err", err, @@ -227,7 +229,7 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { goto nextApp } if prevEvent == nil { - s.Logger.Error("claimer: missing event", + s.Logger.Error("missing event", "claim", prevClaimRow, "err", ErrMissingEvent, ) @@ -236,7 +238,7 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { goto nextApp } if !claimMatchesEvent(&prevClaimRow, prevEvent) { - s.Logger.Error("claimer: event mismatch", + s.Logger.Error("event mismatch", "claim", prevClaimRow, "event", prevEvent, "err", ErrEventMismatch, @@ -258,7 +260,7 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { if currEvent != nil { if !claimMatchesEvent(&currClaimRow, currEvent) { - s.Logger.Error("claimer: event mismatch", + s.Logger.Error("event mismatch", "claim", currClaimRow, "event", currEvent, "err", ErrEventMismatch, @@ -282,7 +284,7 @@ func (s *Service) submitClaimsAndUpdateDatabase(se sideEffects) []error { errs = append(errs, err) goto nextApp } - s.Logger.Info("claimer: Submitting claim to blockchain", + s.Logger.Info("Submitting claim to blockchain", "app", currClaimRow.AppContractAddress, "claim", currClaimRow.EpochHash, "last_block", currClaimRow.EpochLastBlock, diff --git a/internal/claimer/side-effects.go b/internal/claimer/side-effects.go index 2b243072b..9ff7853f8 100644 --- a/internal/claimer/side-effects.go +++ b/internal/claimer/side-effects.go @@ -52,7 +52,7 @@ func (s *Service) selectClaimPairsPerApp() ( map[address]claimRow, error, ) { - computed, accepted, err := s.DBConn.SelectClaimPairsPerApp(s.Context) + computed, accepted, err := s.Repository.SelectClaimPairsPerApp(s.Context) if err != nil { s.Logger.Error("selectClaimPairsPerApp:failed", "service", s.Name, @@ -71,7 +71,7 @@ func (s *Service) updateEpochWithSubmittedClaim( claim *claimRow, txHash hash, ) error { - err := s.DBConn.UpdateEpochWithSubmittedClaim(s.Context, claim.EpochID, txHash) + err := s.Repository.UpdateEpochWithSubmittedClaim(s.Context, claim.EpochID, txHash) if err != nil { s.Logger.Error("updateEpochWithSubmittedClaim:failed", "service", s.Name, diff --git a/internal/evmreader/claim.go b/internal/evmreader/claim.go index a82688426..c0d257dcd 100644 --- a/internal/evmreader/claim.go +++ b/internal/evmreader/claim.go @@ -6,7 +6,6 @@ package evmreader import ( "cmp" "context" - "log/slog" . "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" @@ -14,13 +13,13 @@ import ( "github.com/ethereum/go-ethereum/common" ) -func (r *EvmReader) checkForClaimStatus( +func (r *Service) checkForClaimStatus( ctx context.Context, apps []application, mostRecentBlockNumber uint64, ) { - slog.Debug("evmreader: Checking for new Claim Acceptance Events") + r.Logger.Debug("Checking for new Claim Acceptance Events") // Classify them by lastClaimCheck block appsIndexedByLastCheck := indexApps(keyByLastClaimCheck, apps) @@ -37,7 +36,7 @@ func (r *EvmReader) checkForClaimStatus( if mostRecentBlockNumber > lastClaimCheck { - slog.Debug("evmreader: Checking claim acceptance for applications", + r.Logger.Debug("Checking claim acceptance for applications", "apps", appAddresses, "last claim check block", lastClaimCheck, "most recent block", mostRecentBlockNumber) @@ -45,14 +44,14 @@ func (r *EvmReader) checkForClaimStatus( r.readAndUpdateClaims(ctx, apps, lastClaimCheck, mostRecentBlockNumber) } else if mostRecentBlockNumber < lastClaimCheck { - slog.Warn( - "evmreader: Not reading claim acceptance: most recent block is lower than the last processed one", //nolint:lll + r.Logger.Warn( + "Not reading claim acceptance: most recent block is lower than the last processed one", //nolint:lll "apps", appAddresses, "last claim check block", lastClaimCheck, "most recent block", mostRecentBlockNumber, ) } else { - slog.Warn("evmreader: Not reading claim acceptance: already checked the most recent blocks", + r.Logger.Warn("Not reading claim acceptance: already checked the most recent blocks", "apps", appAddresses, "last claim check block", lastClaimCheck, "most recent block", mostRecentBlockNumber, @@ -62,7 +61,7 @@ func (r *EvmReader) checkForClaimStatus( } } -func (r *EvmReader) readAndUpdateClaims( +func (r *Service) readAndUpdateClaims( ctx context.Context, apps []application, lastClaimCheck, mostRecentBlockNumber uint64, @@ -90,7 +89,7 @@ func (r *EvmReader) readAndUpdateClaims( appClaimAcceptanceEventMap, err := r.readClaimsAcceptance( ctx, consensusContract, appAddresses, lastClaimCheck+1, mostRecentBlockNumber) if err != nil { - slog.Error("evmreader: Error reading claim acceptance status", + r.Logger.Error("Error reading claim acceptance status", "apps", apps, "IConsensus", iConsensusAddress, "start", lastClaimCheck, @@ -110,14 +109,14 @@ func (r *EvmReader) readAndUpdateClaims( previousEpochs, err := r.repository.GetPreviousEpochsWithOpenClaims( ctx, app, claimAcceptance.LastProcessedBlockNumber.Uint64()) if err != nil { - slog.Error("evmreader: Error retrieving previous submitted claims", + r.Logger.Error("Error retrieving previous submitted claims", "app", app, "block", claimAcceptance.LastProcessedBlockNumber.Uint64(), "error", err) continue APP_LOOP } if len(previousEpochs) > 0 { - slog.Error("evmreader: Application got 'not accepted' claims. It is in an invalid state", + r.Logger.Error("Application got 'not accepted' claims. It is in an invalid state", "claim last block", claimAcceptance.LastProcessedBlockNumber, "app", app) continue APP_LOOP @@ -130,7 +129,7 @@ func (r *EvmReader) readAndUpdateClaims( claimAcceptance.LastProcessedBlockNumber.Uint64()), app) if err != nil { - slog.Error("evmreader: Error retrieving Epoch", + r.Logger.Error("Error retrieving Epoch", "app", app, "block", claimAcceptance.LastProcessedBlockNumber.Uint64(), "error", err) @@ -139,16 +138,16 @@ func (r *EvmReader) readAndUpdateClaims( // Check Epoch if epoch == nil { - slog.Error( - "evmreader: Found claim acceptance event for an unknown epoch. Application is in an invalid state", //nolint:lll + r.Logger.Error( + "Found claim acceptance event for an unknown epoch. Application is in an invalid state", //nolint:lll "app", app, "claim last block", claimAcceptance.LastProcessedBlockNumber, "hash", claimAcceptance.Claim) continue APP_LOOP } if epoch.ClaimHash == nil { - slog.Warn( - "evmreader: Found claim acceptance event, but claim hasn't been calculated yet", + r.Logger.Warn( + "Found claim acceptance event, but claim hasn't been calculated yet", "app", app, "lastBlock", claimAcceptance.LastProcessedBlockNumber, ) @@ -156,7 +155,7 @@ func (r *EvmReader) readAndUpdateClaims( } if claimAcceptance.Claim != *epoch.ClaimHash || claimAcceptance.LastProcessedBlockNumber.Uint64() != epoch.LastBlock { - slog.Error("evmreader: Accepted Claim does not match actual Claim. Application is in an invalid state", //nolint:lll + r.Logger.Error("Accepted Claim does not match actual Claim. Application is in an invalid state", //nolint:lll "app", app, "lastBlock", epoch.LastBlock, "hash", epoch.ClaimHash) @@ -164,7 +163,7 @@ func (r *EvmReader) readAndUpdateClaims( continue APP_LOOP } if epoch.Status == EpochStatusClaimAccepted { - slog.Debug("evmreader: Claim already accepted. Skipping", + r.Logger.Debug("Claim already accepted. Skipping", "app", app, "block", claimAcceptance.LastProcessedBlockNumber.Uint64(), "claimStatus", epoch.Status, @@ -174,7 +173,7 @@ func (r *EvmReader) readAndUpdateClaims( if epoch.Status != EpochStatusClaimSubmitted { // this happens when running on latest. EvmReader can see the event before // the claim is marked as submitted by the claimer. - slog.Debug("evmreader: Claim status is not submitted. Skipping for now", + r.Logger.Debug("Claim status is not submitted. Skipping for now", "app", app, "block", claimAcceptance.LastProcessedBlockNumber.Uint64(), "claimStatus", epoch.Status, @@ -183,7 +182,7 @@ func (r *EvmReader) readAndUpdateClaims( } // Update Epoch claim status - slog.Info("evmreader: Claim Accepted", + r.Logger.Info("Claim Accepted", "app", app, "lastBlock", epoch.LastBlock, "hash", epoch.ClaimHash, @@ -195,7 +194,7 @@ func (r *EvmReader) readAndUpdateClaims( err = r.repository.UpdateEpochs( ctx, app, []*Epoch{epoch}, claimAcceptance.Raw.BlockNumber) if err != nil { - slog.Error("evmreader: Error storing claims", "app", app, "error", err) + r.Logger.Error("Error storing claims", "app", app, "error", err) continue } } @@ -204,7 +203,7 @@ func (r *EvmReader) readAndUpdateClaims( } } -func (r *EvmReader) readClaimsAcceptance( +func (r *Service) readClaimsAcceptance( ctx context.Context, consensusContract ConsensusContract, appAddresses []common.Address, diff --git a/internal/evmreader/claim_test.go b/internal/evmreader/claim_test.go index ef9311e3f..30f1fc668 100644 --- a/internal/evmreader/claim_test.go +++ b/internal/evmreader/claim_test.go @@ -12,6 +12,7 @@ import ( . "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/contracts/iinputbox" + "github.com/cartesi/rollups-node/pkg/service" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/mock" ) @@ -21,15 +22,17 @@ func (s *EvmReaderSuite) TestNoClaimsAcceptance() { wsClient := FakeWSEhtClient{} //New EVM Reader - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + evmReader := &Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare repository s.repository.Unset("GetAllRunningApplications") @@ -154,15 +157,17 @@ func (s *EvmReaderSuite) TestReadClaimAcceptance() { //New EVM Reader wsClient := FakeWSEhtClient{} - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Claims Acceptance Events @@ -311,15 +316,17 @@ func (s *EvmReaderSuite) TestCheckClaimFails() { wsClient := FakeWSEhtClient{} inputBox := newMockInputBox() repository := newMockRepository() - evmReader := NewEvmReader( - client, - &wsClient, - inputBox, - repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: client, + wsClient: &wsClient, + inputSource: inputBox, + repository: repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Claims Acceptance Events @@ -458,15 +465,17 @@ func (s *EvmReaderSuite) TestCheckClaimFails() { wsClient := FakeWSEhtClient{} inputBox := newMockInputBox() repository := newMockRepository() - evmReader := NewEvmReader( - client, - &wsClient, - inputBox, - repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: client, + wsClient: &wsClient, + inputSource: inputBox, + repository: repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Claims Acceptance Events @@ -605,15 +614,17 @@ func (s *EvmReaderSuite) TestCheckClaimFails() { wsClient := FakeWSEhtClient{} inputBox := newMockInputBox() repository := newMockRepository() - evmReader := NewEvmReader( - client, - &wsClient, - inputBox, - repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: client, + wsClient: &wsClient, + inputSource: inputBox, + repository: repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Claims Acceptance Events diff --git a/internal/evmreader/evmreader.go b/internal/evmreader/evmreader.go index 1c4257b96..7f419c418 100644 --- a/internal/evmreader/evmreader.go +++ b/internal/evmreader/evmreader.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "log/slog" "math/big" "time" @@ -30,10 +29,8 @@ import ( type CreateInfo struct { service.CreateInfo - model.EvmReaderPersistentConfig - DefaultBlockString string PostgresEndpoint config.Redacted[string] BlockchainHttpEndpoint config.Redacted[string] BlockchainWsEndpoint config.Redacted[string] @@ -44,7 +41,16 @@ type CreateInfo struct { type Service struct { service.Service - reader EvmReader + + client EthClient + wsClient EthWsClient + inputSource InputSource + repository EvmReaderRepository + contractFactory ContractFactory + inputBoxDeploymentBlock uint64 + defaultBlock DefaultBlock + epochLengthCache map[Address]uint64 + hasEnabledApps bool } func (c *CreateInfo) LoadEnv() { @@ -53,6 +59,7 @@ func (c *CreateInfo) LoadEnv() { c.MaxDelay = config.GetEvmReaderRetryPolicyMaxDelay() c.MaxRetries = config.GetEvmReaderRetryPolicyMaxRetries() c.PostgresEndpoint.Value = config.GetPostgresEndpoint() + c.LogLevel = service.LogLevel(config.GetLogLevel()) // persistent c.DefaultBlock = config.GetEvmReaderDefaultBlock() @@ -69,11 +76,6 @@ func Create(c *CreateInfo, s *Service) error { return err } - c.DefaultBlock, err = config.ToDefaultBlockFromString(c.DefaultBlockString) - if err != nil { - return err - } - client, err := ethclient.DialContext(s.Context, c.BlockchainHttpEndpoint.Value) if err != nil { return err @@ -103,15 +105,14 @@ func Create(c *CreateInfo, s *Service) error { contractFactory := NewEvmReaderContractFactory(client, c.MaxRetries, c.MaxDelay) - s.reader = NewEvmReader( - NewEhtClientWithRetryPolicy(client, c.MaxRetries, c.MaxDelay), - NewEthWsClientWithRetryPolicy(wsClient, c.MaxRetries, c.MaxDelay), - NewInputSourceWithRetryPolicy(inputSource, c.MaxRetries, c.MaxDelay), - c.Database, - c.InputBoxDeploymentBlock, - c.DefaultBlock, - contractFactory, - ) + s.client = NewEhtClientWithRetryPolicy(client, c.MaxRetries, c.MaxDelay) + s.wsClient = NewEthWsClientWithRetryPolicy(wsClient, c.MaxRetries, c.MaxDelay) + s.inputSource = NewInputSourceWithRetryPolicy(inputSource, c.MaxRetries, c.MaxDelay) + s.repository = c.Database + s.inputBoxDeploymentBlock = c.InputBoxDeploymentBlock + s.defaultBlock = c.DefaultBlock + s.contractFactory = contractFactory + s.hasEnabledApps = true return nil } @@ -136,7 +137,7 @@ func (s *Service) Tick() []error { } func (s *Service) Start(context context.Context, ready chan<- struct{}) error { - go s.reader.Run(s.Context, ready) + go s.Run(s.Context, ready) return s.Serve() } func (s *Service) String() string { @@ -245,47 +246,7 @@ type application struct { consensusContract ConsensusContract } -// EvmReader reads Input Added, Claim Submitted and -// Output Executed events from the blockchain -type EvmReader struct { - client EthClient - wsClient EthWsClient - inputSource InputSource - repository EvmReaderRepository - contractFactory ContractFactory - inputBoxDeploymentBlock uint64 - defaultBlock DefaultBlock - epochLengthCache map[Address]uint64 - hasEnabledApps bool -} - -func (r *EvmReader) String() string { - return "evmreader" -} - -// Creates a new EvmReader -func NewEvmReader( - client EthClient, - wsClient EthWsClient, - inputSource InputSource, - repository EvmReaderRepository, - inputBoxDeploymentBlock uint64, - defaultBlock DefaultBlock, - contractFactory ContractFactory, -) EvmReader { - return EvmReader{ - client: client, - wsClient: wsClient, - inputSource: inputSource, - repository: repository, - inputBoxDeploymentBlock: inputBoxDeploymentBlock, - defaultBlock: defaultBlock, - contractFactory: contractFactory, - hasEnabledApps: true, - } -} - -func (r *EvmReader) Run(ctx context.Context, ready chan<- struct{}) error { +func (r *Service) Run(ctx context.Context, ready chan<- struct{}) error { // Initialize epochLength cache r.epochLengthCache = make(map[Address]uint64) @@ -297,20 +258,20 @@ func (r *EvmReader) Run(ctx context.Context, ready chan<- struct{}) error { if _, ok := err.(*SubscriptionError); !ok { return err } - slog.Error(err.Error()) - slog.Info("evmreader: Restarting subscription") + r.Logger.Error(err.Error()) + r.Logger.Info("Restarting subscription") } } // watchForNewBlocks watches for new blocks and reads new inputs based on the // default block configuration, which have not been processed yet. -func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{}) error { +func (r *Service) watchForNewBlocks(ctx context.Context, ready chan<- struct{}) error { headers := make(chan *types.Header) sub, err := r.wsClient.SubscribeNewHead(ctx, headers) if err != nil { return fmt.Errorf("could not start subscription: %v", err) } - slog.Info("evmreader: Subscribed to new block events") + r.Logger.Info("Subscribed to new block events") ready <- struct{}{} defer sub.Unsubscribe() @@ -323,13 +284,13 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} case header := <-headers: // Every time a new block arrives - slog.Debug("evmreader: New block header received", "blockNumber", header.Number, "blockHash", header.Hash()) + r.Logger.Debug("New block header received", "blockNumber", header.Number, "blockHash", header.Hash()) - slog.Debug("evmreader: Retrieving enabled applications") + r.Logger.Debug("Retrieving enabled applications") // Get All Applications runningApps, err := r.repository.GetAllRunningApplications(ctx) if err != nil { - slog.Error("evmreader: Error retrieving running applications", + r.Logger.Error("Error retrieving running applications", "error", err, ) @@ -338,13 +299,13 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} if len(runningApps) == 0 { if r.hasEnabledApps { - slog.Info("evmreader: No registered applications enabled") + r.Logger.Info("No registered applications enabled") } r.hasEnabledApps = false continue } if !r.hasEnabledApps { - slog.Info("evmreader: Found enabled applications") + r.Logger.Info("Found enabled applications") } r.hasEnabledApps = true @@ -353,7 +314,7 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} for _, app := range runningApps { applicationContract, consensusContract, err := r.getAppContracts(app) if err != nil { - slog.Error("evmreader: Error retrieving application contracts", "app", app, "error", err) + r.Logger.Error("Error retrieving application contracts", "app", app, "error", err) continue } apps = append(apps, application{Application: app, @@ -362,7 +323,7 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} } if len(apps) == 0 { - slog.Info("evmreader: No correctly configured applications running") + r.Logger.Info("No correctly configured applications running") continue } @@ -373,14 +334,14 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} r.defaultBlock, ) if err != nil { - slog.Error("evmreader: Error fetching most recent block", + r.Logger.Error("Error fetching most recent block", "default block", r.defaultBlock, "error", err) continue } blockNumber = mostRecentHeader.Number.Uint64() - slog.Debug(fmt.Sprintf("evmreader: Using block %d and not %d because of commitment policy: %s", + r.Logger.Debug(fmt.Sprintf("Using block %d and not %d because of commitment policy: %s", mostRecentHeader.Number.Uint64(), header.Number.Uint64(), r.defaultBlock)) } @@ -396,7 +357,7 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} // fetchMostRecentHeader fetches the most recent header up till the // given default block -func (r *EvmReader) fetchMostRecentHeader( +func (r *Service) fetchMostRecentHeader( ctx context.Context, defaultBlock DefaultBlock, ) (*types.Header, error) { @@ -431,7 +392,7 @@ func (r *EvmReader) fetchMostRecentHeader( // getAppContracts retrieves the ApplicationContract and ConsensusContract for a given Application. // Also validates if IConsensus configuration matches the blockchain registered one -func (r *EvmReader) getAppContracts(app Application, +func (r *Service) getAppContracts(app Application, ) (ApplicationContract, ConsensusContract, error) { applicationContract, err := r.contractFactory.NewApplication(app.ContractAddress) if err != nil { diff --git a/internal/evmreader/evmreader_test.go b/internal/evmreader/evmreader_test.go index 7a4a80753..b12dc0c22 100644 --- a/internal/evmreader/evmreader_test.go +++ b/internal/evmreader/evmreader_test.go @@ -17,6 +17,7 @@ import ( appcontract "github.com/cartesi/rollups-node/pkg/contracts/iapplication" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/contracts/iinputbox" + "github.com/cartesi/rollups-node/pkg/service" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -73,7 +74,7 @@ type EvmReaderSuite struct { wsClient *MockEthClient inputBox *MockInputBox repository *MockRepository - evmReader *EvmReader + evmReader Service contractFactory *MockEvmReaderContractFactory } @@ -108,22 +109,24 @@ func (s *EvmReaderSuite) TearDownSuite() { } func (s *EvmReaderSuite) SetupTest() { - s.client = newMockEthClient() s.wsClient = s.client s.inputBox = newMockInputBox() s.repository = newMockRepository() s.contractFactory = newEmvReaderContractFactory() - inputReader := NewEvmReader( - s.client, - s.wsClient, - s.inputBox, - s.repository, - 0, - DefaultBlockStatusLatest, - s.contractFactory, - ) - s.evmReader = &inputReader + s.evmReader = Service{ + client: s.client, + wsClient: s.wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{ + Name: "evm-reader", + }, &s.evmReader.Service) } // Service tests @@ -170,27 +173,25 @@ func (s *EvmReaderSuite) TestItFailsToSubscribeForNewInputsOnStart() { } func (s *EvmReaderSuite) TestItWrongIConsensus() { - consensusContract := &MockIConsensusContract{} - contractFactory := newEmvReaderContractFactory() - contractFactory.Unset("NewIConsensus") contractFactory.On("NewIConsensus", mock.Anything, ).Return(consensusContract, nil) wsClient := FakeWSEhtClient{} - - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare consensus claimEvent0 := &iconsensus.IConsensusClaimAcceptance{ diff --git a/internal/evmreader/input.go b/internal/evmreader/input.go index 9f169aa03..1a7bdda46 100644 --- a/internal/evmreader/input.go +++ b/internal/evmreader/input.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "log/slog" . "github.com/cartesi/rollups-node/internal/model" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -15,13 +14,13 @@ import ( ) // checkForNewInputs checks if is there new Inputs for all running Applications -func (r *EvmReader) checkForNewInputs( +func (r *Service) checkForNewInputs( ctx context.Context, apps []application, mostRecentBlockNumber uint64, ) { - slog.Debug("evmreader: Checking for new inputs") + r.Logger.Debug("Checking for new inputs") groupedApps := indexApps(byLastProcessedBlock, apps) @@ -37,7 +36,7 @@ func (r *EvmReader) checkForNewInputs( if mostRecentBlockNumber > lastProcessedBlock { - slog.Debug("evmreader: Checking inputs for applications", + r.Logger.Debug("Checking inputs for applications", "apps", appAddresses, "last processed block", lastProcessedBlock, "most recent block", mostRecentBlockNumber, @@ -49,7 +48,7 @@ func (r *EvmReader) checkForNewInputs( apps, ) if err != nil { - slog.Error("Error reading inputs", + r.Logger.Error("Error reading inputs", "apps", appAddresses, "last processed block", lastProcessedBlock, "most recent block", mostRecentBlockNumber, @@ -58,14 +57,14 @@ func (r *EvmReader) checkForNewInputs( continue } } else if mostRecentBlockNumber < lastProcessedBlock { - slog.Warn( - "evmreader: Not reading inputs: most recent block is lower than the last processed one", + r.Logger.Warn( + "Not reading inputs: most recent block is lower than the last processed one", "apps", appAddresses, "last processed block", lastProcessedBlock, "most recent block", mostRecentBlockNumber, ) } else { - slog.Info("evmreader: Not reading inputs: already checked the most recent blocks", + r.Logger.Info("Not reading inputs: already checked the most recent blocks", "apps", appAddresses, "last processed block", lastProcessedBlock, "most recent block", mostRecentBlockNumber, @@ -76,7 +75,7 @@ func (r *EvmReader) checkForNewInputs( // readAndStoreInputs reads, inputs from the InputSource given specific filter options, indexes // them into epochs and store the indexed inputs and epochs -func (r *EvmReader) readAndStoreInputs( +func (r *Service) readAndStoreInputs( ctx context.Context, startBlock uint64, endBlock uint64, @@ -89,7 +88,7 @@ func (r *EvmReader) readAndStoreInputs( // Get App EpochLength err := r.addAppEpochLengthIntoCache(app) if err != nil { - slog.Error("evmreader: Error adding epoch length into cache", + r.Logger.Error("Error adding epoch length into cache", "app", app.ContractAddress, "error", err) continue @@ -100,7 +99,7 @@ func (r *EvmReader) readAndStoreInputs( } if len(appsToProcess) == 0 { - slog.Warn("evmreader: No valid running applications") + r.Logger.Warn("No valid running applications") return nil } @@ -122,7 +121,7 @@ func (r *EvmReader) readAndStoreInputs( currentEpoch, err := r.repository.GetEpoch(ctx, calculateEpochIndex(epochLength, startBlock), address) if err != nil { - slog.Error("evmreader: Error retrieving existing current epoch", + r.Logger.Error("Error retrieving existing current epoch", "app", address, "error", err, ) @@ -131,7 +130,7 @@ func (r *EvmReader) readAndStoreInputs( // Check current epoch status if currentEpoch != nil && currentEpoch.Status != EpochStatusOpen { - slog.Error("evmreader: Current epoch is not open", + r.Logger.Error("Current epoch is not open", "app", address, "epoch_index", currentEpoch.Index, "status", currentEpoch.Status, @@ -150,7 +149,7 @@ func (r *EvmReader) readAndStoreInputs( // If input belongs into a new epoch, close the previous known one if currentEpoch != nil && currentEpoch.Index != inputEpochIndex { currentEpoch.Status = EpochStatusClosed - slog.Info("evmreader: Closing epoch", + r.Logger.Info("Closing epoch", "app", currentEpoch.AppAddress, "epoch_index", currentEpoch.Index, "start", currentEpoch.FirstBlock, @@ -168,7 +167,7 @@ func (r *EvmReader) readAndStoreInputs( epochInputMap[currentEpoch] = []Input{} } - slog.Info("evmreader: Found new Input", + r.Logger.Info("Found new Input", "app", address, "index", input.Index, "block", input.BlockNumber, @@ -185,7 +184,7 @@ func (r *EvmReader) readAndStoreInputs( // Indexed all inputs. Check if it is time to close this epoch if currentEpoch != nil && endBlock >= currentEpoch.LastBlock { currentEpoch.Status = EpochStatusClosed - slog.Info("evmreader: Closing epoch", + r.Logger.Info("Closing epoch", "app", currentEpoch.AppAddress, "epoch_index", currentEpoch.Index, "start", currentEpoch.FirstBlock, @@ -204,7 +203,7 @@ func (r *EvmReader) readAndStoreInputs( address, ) if err != nil { - slog.Error("evmreader: Error storing inputs and epochs", + r.Logger.Error("Error storing inputs and epochs", "app", address, "error", err, ) @@ -214,7 +213,7 @@ func (r *EvmReader) readAndStoreInputs( // Store everything if len(epochInputMap) > 0 { - slog.Debug("evmreader: Inputs and epochs stored successfully", + r.Logger.Debug("Inputs and epochs stored successfully", "app", address, "start-block", startBlock, "end-block", endBlock, @@ -222,7 +221,7 @@ func (r *EvmReader) readAndStoreInputs( "total inputs", len(inputs), ) } else { - slog.Debug("evmreader: No inputs or epochs to store") + r.Logger.Debug("No inputs or epochs to store") } } @@ -232,7 +231,7 @@ func (r *EvmReader) readAndStoreInputs( // addAppEpochLengthIntoCache checks the epoch length cache and read epoch length from IConsensus // contract and add it to the cache if needed -func (r *EvmReader) addAppEpochLengthIntoCache(app application) error { +func (r *Service) addAppEpochLengthIntoCache(app application) error { epochLength, ok := r.epochLengthCache[app.ContractAddress] if !ok { @@ -245,11 +244,11 @@ func (r *EvmReader) addAppEpochLengthIntoCache(app application) error { err) } r.epochLengthCache[app.ContractAddress] = epochLength - slog.Info("evmreader: Got epoch length from IConsensus", + r.Logger.Info("Got epoch length from IConsensus", "app", app.ContractAddress, "epoch length", epochLength) } else { - slog.Debug("evmreader: Got epoch length from cache", + r.Logger.Debug("Got epoch length from cache", "app", app.ContractAddress, "epoch length", epochLength) } @@ -258,7 +257,7 @@ func (r *EvmReader) addAppEpochLengthIntoCache(app application) error { } // readInputsFromBlockchain read the inputs from the blockchain ordered by Input index -func (r *EvmReader) readInputsFromBlockchain( +func (r *Service) readInputsFromBlockchain( ctx context.Context, appsAddresses []Address, startBlock, endBlock uint64, @@ -282,7 +281,7 @@ func (r *EvmReader) readInputsFromBlockchain( // Order inputs as order is not enforced by RetrieveInputs method nor the APIs for _, event := range inputsEvents { - slog.Debug("evmreader: Received input", + r.Logger.Debug("Received input", "app", event.AppContract, "index", event.Index, "block", event.Raw.BlockNumber) diff --git a/internal/evmreader/input_test.go b/internal/evmreader/input_test.go index 918a21f65..1b6c9ccad 100644 --- a/internal/evmreader/input_test.go +++ b/internal/evmreader/input_test.go @@ -8,6 +8,7 @@ import ( . "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/pkg/contracts/iinputbox" + "github.com/cartesi/rollups-node/pkg/service" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/mock" @@ -17,15 +18,17 @@ func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocks() { wsClient := FakeWSEhtClient{} - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + evmReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare repository s.repository.Unset("GetAllRunningApplications") @@ -125,15 +128,17 @@ func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() { wsClient := FakeWSEhtClient{} - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + evmReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare repository s.repository.Unset("GetAllRunningApplications") @@ -233,15 +238,17 @@ func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { wsClient := FakeWSEhtClient{} - inputReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + inputReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &inputReader.Service) // Prepare Client s.client.Unset("HeaderByNumber") @@ -327,15 +334,17 @@ func (s *EvmReaderSuite) TestItReadsMultipleInputsFromSingleNewBlock() { func (s *EvmReaderSuite) TestItStartsWhenLasProcessedBlockIsTheMostRecentBlock() { wsClient := FakeWSEhtClient{} - inputReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + inputReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &inputReader.Service) // Prepare Client s.client.Unset("HeaderByNumber") diff --git a/internal/evmreader/output.go b/internal/evmreader/output.go index 5723a26c2..efd8be362 100644 --- a/internal/evmreader/output.go +++ b/internal/evmreader/output.go @@ -12,7 +12,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" ) -func (r *EvmReader) checkForOutputExecution( +func (r *Service) checkForOutputExecution( ctx context.Context, apps []application, mostRecentBlockNumber uint64, @@ -59,7 +59,7 @@ func (r *EvmReader) checkForOutputExecution( } -func (r *EvmReader) readAndUpdateOutputs( +func (r *Service) readAndUpdateOutputs( ctx context.Context, app application, lastOutputCheck, mostRecentBlockNumber uint64) { contract := app.applicationContract diff --git a/internal/evmreader/output_test.go b/internal/evmreader/output_test.go index c05a6c585..5c8b2083c 100644 --- a/internal/evmreader/output_test.go +++ b/internal/evmreader/output_test.go @@ -11,6 +11,7 @@ import ( . "github.com/cartesi/rollups-node/internal/model" appcontract "github.com/cartesi/rollups-node/pkg/contracts/iapplication" "github.com/cartesi/rollups-node/pkg/contracts/iinputbox" + "github.com/cartesi/rollups-node/pkg/service" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/stretchr/testify/mock" @@ -21,15 +22,17 @@ func (s *EvmReaderSuite) TestOutputExecution() { wsClient := FakeWSEhtClient{} //New EVM Reader - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) + evmReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x10, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: s.contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare repository s.repository.Unset("GetAllRunningApplications") @@ -151,15 +154,17 @@ func (s *EvmReaderSuite) TestReadOutputExecution() { //New EVM Reader wsClient := FakeWSEhtClient{} - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: s.client, + wsClient: &wsClient, + inputSource: s.inputBox, + repository: s.repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Output Executed Events outputExecution0 := &appcontract.IApplicationOutputExecuted{ @@ -284,15 +289,17 @@ func (s *EvmReaderSuite) TestCheckOutputFails() { wsClient := FakeWSEhtClient{} inputBox := newMockInputBox() repository := newMockRepository() - evmReader := NewEvmReader( - client, - &wsClient, - inputBox, - repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: client, + wsClient: &wsClient, + inputSource: inputBox, + repository: repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) applicationContract.On("RetrieveOutputExecutionEvents", mock.Anything, @@ -396,15 +403,17 @@ func (s *EvmReaderSuite) TestCheckOutputFails() { wsClient := FakeWSEhtClient{} inputBox := newMockInputBox() repository := newMockRepository() - evmReader := NewEvmReader( - client, - &wsClient, - inputBox, - repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: client, + wsClient: &wsClient, + inputSource: inputBox, + repository: repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Output Executed Events outputExecution0 := &appcontract.IApplicationOutputExecuted{ @@ -513,15 +522,17 @@ func (s *EvmReaderSuite) TestCheckOutputFails() { wsClient := FakeWSEhtClient{} inputBox := newMockInputBox() repository := newMockRepository() - evmReader := NewEvmReader( - client, - &wsClient, - inputBox, - repository, - 0x00, - DefaultBlockStatusLatest, - contractFactory, - ) + evmReader := Service{ + client: client, + wsClient: &wsClient, + inputSource: inputBox, + repository: repository, + inputBoxDeploymentBlock: 0x00, + defaultBlock: DefaultBlockStatusLatest, + contractFactory: contractFactory, + hasEnabledApps: true, + } + service.Create(&service.CreateInfo{}, &evmReader.Service) // Prepare Output Executed Events outputExecution0 := &appcontract.IApplicationOutputExecuted{ diff --git a/internal/node/services.go b/internal/node/services.go index 1ab25480d..c998206a7 100644 --- a/internal/node/services.go +++ b/internal/node/services.go @@ -5,12 +5,14 @@ package node import ( "fmt" + "log/slog" "net/http" advancerservice "github.com/cartesi/rollups-node/internal/advancer" claimerservice "github.com/cartesi/rollups-node/internal/claimer" "github.com/cartesi/rollups-node/internal/config" readerservice "github.com/cartesi/rollups-node/internal/evmreader" + "github.com/cartesi/rollups-node/internal/model" "github.com/cartesi/rollups-node/internal/repository" "github.com/cartesi/rollups-node/internal/services" validatorservice "github.com/cartesi/rollups-node/internal/validator" @@ -60,94 +62,94 @@ func newHttpService(c config.NodeConfig, serveMux *http.ServeMux) services.Servi } } -func newEvmReaderService(c config.NodeConfig, database *repository.Database) services.Service { - readerService := readerservice.Service{} - createInfo := readerservice.CreateInfo{ +func newEvmReaderService(cfg config.NodeConfig, database *repository.Database) services.Service { + s := readerservice.Service{} + c := readerservice.CreateInfo{ CreateInfo: service.CreateInfo{ - Name: "reader", - Impl: &readerService, + Name: "evm-reader", + Impl: &s, ProcOwner: true, // TODO: Remove this after updating supervisor - LogLevel: service.LogLevel(c.LogLevel), + }, + EvmReaderPersistentConfig: model.EvmReaderPersistentConfig{ + DefaultBlock: model.DefaultBlockStatusSafe, }, Database: database, } + c.LoadEnv() - err := readerservice.Create(&createInfo, &readerService) + err := readerservice.Create(&c, &s) if err != nil { - readerService.Logger.Error("Fatal", - "service", readerService.Name, + slog.Error("Fatal", + "service", c.Name, "error", err) + panic(err) } - return &readerService + return &s } -func newAdvancerService(c config.NodeConfig, database *repository.Database, serveMux *http.ServeMux) services.Service { - advancerService := advancerservice.Service{} - createInfo := advancerservice.CreateInfo{ +func newAdvancerService(cfg config.NodeConfig, database *repository.Database, serveMux *http.ServeMux) services.Service { + s := advancerservice.Service{} + c := advancerservice.CreateInfo{ CreateInfo: service.CreateInfo{ Name: "advancer", - PollInterval: c.AdvancerPollingInterval, - Impl: &advancerService, + Impl: &s, ProcOwner: true, // TODO: Remove this after updating supervisor - LogLevel: service.LogLevel(c.LogLevel), ServeMux: serveMux, }, Repository: database, } + c.LoadEnv() - err := advancerservice.Create(&createInfo, &advancerService) + err := advancerservice.Create(&c, &s) if err != nil { - advancerService.Logger.Error("Fatal", - "service", advancerService.Name, + slog.Error("Fatal", + "service", c.Name, "error", err) + panic(err) } - return &advancerService + return &s } -func newValidatorService(c config.NodeConfig, database *repository.Database) services.Service { - validatorService := validatorservice.Service{} - createInfo := validatorservice.CreateInfo{ +func newValidatorService(cfg config.NodeConfig, database *repository.Database) services.Service { + s := validatorservice.Service{} + c := validatorservice.CreateInfo{ CreateInfo: service.CreateInfo{ Name: "validator", - PollInterval: c.ValidatorPollingInterval, - Impl: &validatorService, + Impl: &s, ProcOwner: true, // TODO: Remove this after updating supervisor - LogLevel: service.LogLevel(c.LogLevel), }, Repository: database, } + c.LoadEnv() - err := validatorservice.Create(createInfo, &validatorService) + err := validatorservice.Create(c, &s) if err != nil { - validatorService.Logger.Error("Fatal", - "service", validatorService.Name, + slog.Error("Fatal", + "service", c.Name, "error", err) + panic(err) } - return &validatorService + return &s } -func newClaimerService(c config.NodeConfig, database *repository.Database) services.Service { - claimerService := claimerservice.Service{} - createInfo := claimerservice.CreateInfo{ - Auth: c.Auth, - DBConn: database, - PostgresEndpoint: c.PostgresEndpoint, - BlockchainHttpEndpoint: c.BlockchainHttpEndpoint, - EnableSubmission: c.FeatureClaimSubmissionEnabled, +func newClaimerService(cfg config.NodeConfig, database *repository.Database) services.Service { + s := claimerservice.Service{} + c := claimerservice.CreateInfo{ CreateInfo: service.CreateInfo{ Name: "claimer", - PollInterval: c.ClaimerPollingInterval, - Impl: &claimerService, + Impl: &s, ProcOwner: true, // TODO: Remove this after updating supervisor - LogLevel: service.LogLevel(c.LogLevel), }, + Repository: database, } + c.LoadEnv() - err := claimerservice.Create(createInfo, &claimerService) + err := claimerservice.Create(c, &s) if err != nil { - claimerService.Logger.Error("Fatal", - "service", claimerService.Name, + slog.Error("Fatal", + "service", c.Name, "error", err) + panic(err) } - return &claimerService + return &s } diff --git a/internal/services/poller/poller.go b/internal/services/poller/poller.go deleted file mode 100644 index 174f63fb1..000000000 --- a/internal/services/poller/poller.go +++ /dev/null @@ -1,53 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -package poller - -import ( - "context" - "errors" - "fmt" - "log/slog" - "time" -) - -type Service interface { - Step(context.Context) error -} - -type Poller struct { - name string - service Service - ticker *time.Ticker -} - -var ErrInvalidPollingInterval = errors.New("polling interval must be greater than zero") - -func New(name string, service Service, pollingInterval time.Duration) (*Poller, error) { - if pollingInterval <= 0 { - return nil, ErrInvalidPollingInterval - } - ticker := time.NewTicker(pollingInterval) - return &Poller{name: name, service: service, ticker: ticker}, nil -} - -func (poller *Poller) Start(ctx context.Context) error { - slog.Debug(fmt.Sprintf("%s: poller started", poller.name)) - - for { - // Runs the service's inner routine. - err := poller.service.Step(ctx) - if err != nil { - return err - } - - // Waits for the polling interval to elapse (or for the context to be canceled). - select { - case <-poller.ticker.C: - continue - case <-ctx.Done(): - poller.ticker.Stop() - return nil - } - } -} diff --git a/internal/validator/validator.go b/internal/validator/validator.go index 613db042d..6f513d461 100644 --- a/internal/validator/validator.go +++ b/internal/validator/validator.go @@ -8,7 +8,6 @@ package validator import ( "context" "fmt" - "log/slog" "time" "github.com/cartesi/rollups-node/internal/config" @@ -139,7 +138,7 @@ func (v *Service) Run(ctx context.Context) error { // validateApplication calculates, validates and stores the claim and/or proofs // for each processed epoch of the application. func (v *Service) validateApplication(ctx context.Context, app Application) error { - slog.Debug("validator: starting validation", "application", app.ContractAddress) + v.Logger.Debug("starting validation", "application", app.ContractAddress) processedEpochs, err := v.repository.GetProcessedEpochs(ctx, app.ContractAddress) if err != nil { return fmt.Errorf( @@ -149,12 +148,12 @@ func (v *Service) validateApplication(ctx context.Context, app Application) erro } for _, epoch := range processedEpochs { - slog.Debug("validator: started calculating claim", + v.Logger.Debug("started calculating claim", "app", app.ContractAddress, "epoch_index", epoch.Index, ) claim, outputs, err := v.createClaimAndProofs(ctx, epoch) - slog.Info("validator: claim calculated", + v.Logger.Info("claim calculated", "app", app.ContractAddress, "epoch_index", epoch.Index, ) @@ -209,7 +208,7 @@ func (v *Service) validateApplication(ctx context.Context, app Application) erro } if len(processedEpochs) == 0 { - slog.Debug("validator: no processed epochs to validate", + v.Logger.Debug("no processed epochs to validate", "app", app.ContractAddress, ) } diff --git a/internal/validator/validator_test.go b/internal/validator/validator_test.go index db58109e1..33353b730 100644 --- a/internal/validator/validator_test.go +++ b/internal/validator/validator_test.go @@ -10,6 +10,7 @@ import ( "github.com/cartesi/rollups-node/internal/merkle" . "github.com/cartesi/rollups-node/internal/model" + "github.com/cartesi/rollups-node/pkg/service" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) @@ -33,6 +34,7 @@ func (s *ValidatorSuite) SetupSubTest() { validator = &Service{ repository: repo, } + s.Require().Nil(service.Create(&service.CreateInfo{}, &validator.Service)) dummyEpochs = []Epoch{ {Index: 0, FirstBlock: 0, LastBlock: 9}, {Index: 1, FirstBlock: 10, LastBlock: 19}, diff --git a/pkg/service/service.go b/pkg/service/service.go index afb1db1c0..a3f8e73cc 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -63,6 +63,8 @@ import ( "sync/atomic" "syscall" "time" + + "github.com/lmittmann/tint" ) var ( @@ -136,15 +138,16 @@ func Create(ci *CreateInfo, s *Service) error { // log if s.Logger == nil { - // opts := &tint.Options{ - // Level: LogLevel, - // AddSource: LogLevel == slog.LevelDebug, - // // RFC3339 with milliseconds and without timezone - // TimeFormat: "2006-01-02T15:04:05.000", - // } - // handler := tint.NewHandler(os.Stdout, opts) - // s.Logger = slog.New(handler) - s.Logger = slog.Default() + opts := &tint.Options{ + Level: slog.Level(ci.LogLevel), + AddSource: slog.Level(ci.LogLevel) == slog.LevelDebug, + // RFC3339 with milliseconds and without timezone + TimeFormat: "2006-01-02T15:04:05.000", + } + handler := tint.NewHandler(os.Stdout, opts) + s.Logger = slog.New(handler) + //s.Logger = slog.Default() + s.Logger = s.Logger.With("service", s.Name) } // context and cancelation @@ -185,7 +188,6 @@ func Create(ci *CreateInfo, s *Service) error { if ci.ServeMux == nil { if !ci.ProcOwner { s.Logger.Warn("Create:Created a new ServeMux", - "service", s.Name, "ProcOwner", ci.ProcOwner, "LogLevel", ci.LogLevel) } @@ -204,13 +206,11 @@ func Create(ci *CreateInfo, s *Service) error { // ProcOwner will be ready on the call to Serve if ci.ProcOwner { s.Logger.Info("Create", - "service", s.Name, "LogLevel", ci.LogLevel, "pid", os.Getpid()) } else { s.Running.Store(true) s.Logger.Info("Create", - "service", s.Name, "LogLevel", ci.LogLevel) } return nil @@ -231,12 +231,10 @@ func (s *Service) Reload() []error { if len(errs) > 0 { s.Logger.Error("Reload", - "service", s.Name, "duration", elapsed, "error", errs) } else { s.Logger.Info("Reload", - "service", s.Name, "duration", elapsed) } return errs @@ -249,12 +247,10 @@ func (s *Service) Tick() []error { if len(errs) > 0 { s.Logger.Error("Tick", - "service", s.Name, "duration", elapsed, "error", errs) } else { s.Logger.Debug("Tick", - "service", s.Name, "duration", elapsed) } return errs @@ -271,13 +267,11 @@ func (s *Service) Stop(force bool) []error { s.Running.Store(false) if len(errs) > 0 { s.Logger.Error("Stop", - "service", s.Name, "force", force, "duration", elapsed, "error", errs) } else { s.Logger.Info("Stop", - "service", s.Name, "force", force, "duration", elapsed) } @@ -327,7 +321,6 @@ func (s *Service) CreateDefaultTelemetry( return nil default: s.Logger.Error("http", - "service", s.Name, "error", err, "try", retry+1, "maxRetries", maxRetries,