diff --git a/cmd/cartesi-rollups-cli/root/app/app.go b/cmd/cartesi-rollups-cli/root/app/app.go index 2096fce44..71b0f376c 100644 --- a/cmd/cartesi-rollups-cli/root/app/app.go +++ b/cmd/cartesi-rollups-cli/root/app/app.go @@ -7,6 +7,7 @@ import ( "github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/app/deploy" "github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/app/list" "github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/app/register" + "github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/app/status" "github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/common" "github.com/spf13/cobra" ) @@ -30,4 +31,5 @@ func init() { Cmd.AddCommand(register.Cmd) Cmd.AddCommand(deploy.Cmd) Cmd.AddCommand(list.Cmd) + Cmd.AddCommand(status.Cmd) } diff --git a/cmd/cartesi-rollups-cli/root/app/status/status.go b/cmd/cartesi-rollups-cli/root/app/status/status.go new file mode 100644 index 000000000..78e484e03 --- /dev/null +++ b/cmd/cartesi-rollups-cli/root/app/status/status.go @@ -0,0 +1,89 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package status + +import ( + "fmt" + "os" + + cmdcommon "github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/common" + "github.com/cartesi/rollups-node/internal/model" + "github.com/ethereum/go-ethereum/common" + "github.com/spf13/cobra" +) + +var Cmd = &cobra.Command{ + Use: "status", + Short: "Display and set application status", + Example: examples, + Run: run, +} + +const examples = `# Get application status: +cartesi-rollups-cli app status -a 0x000000000000000000000000000000000` + +var ( + enable bool + disable bool +) + +func init() { + Cmd.Flags().StringVarP( + &cmdcommon.ApplicationAddress, + "address", + "a", + "", + "Application contract address", + ) + cobra.CheckErr(Cmd.MarkFlagRequired("address")) + + Cmd.Flags().BoolVarP( + &enable, + "enable", + "e", + false, + "Enable the application", + ) + + Cmd.Flags().BoolVarP( + &disable, + "disable", + "d", + false, + "Disable the application", + ) + +} + +func run(cmd *cobra.Command, args []string) { + ctx := cmd.Context() + + if cmdcommon.Database == nil { + panic("Database was not initialized") + } + + address := common.HexToAddress(cmdcommon.ApplicationAddress) + application, err := cmdcommon.Database.GetApplication(ctx, address) + cobra.CheckErr(err) + + if (!cmd.Flags().Changed("enable")) && (!cmd.Flags().Changed("disable")) { + fmt.Println(application.Status) + os.Exit(0) + } + + if cmd.Flags().Changed("enable") && cmd.Flags().Changed("disable") { + fmt.Fprintln(os.Stderr, "Cannot enable and disable at the same time") + os.Exit(1) + } + + status := model.ApplicationStatusRunning + if cmd.Flags().Changed("disable") { + status = model.ApplicationStatusNotRunning + } + + err = cmdcommon.Database.UpdateApplicationStatus(ctx, address, status) + cobra.CheckErr(err) + + fmt.Printf("Application status updated to %s\n", status) +} diff --git a/internal/advancer/advancer.go b/internal/advancer/advancer.go index 3f660df7d..fa3213705 100644 --- a/internal/advancer/advancer.go +++ b/internal/advancer/advancer.go @@ -50,6 +50,12 @@ func (advancer *Advancer) Poller(pollingInterval time.Duration) (*poller.Poller, // runs them through the cartesi machine, // and updates the repository with the outputs. func (advancer *Advancer) Step(ctx context.Context) error { + // Dynamically updates the list of machines + err := advancer.machines.UpdateMachines(ctx) + if err != nil { + return err + } + apps := advancer.machines.Apps() // Gets the unprocessed inputs (of all apps) from the repository. @@ -125,6 +131,7 @@ type Repository interface { type Machines interface { GetAdvanceMachine(app Address) (machines.AdvanceMachine, bool) + UpdateMachines(ctx context.Context) error Apps() []Address } diff --git a/internal/advancer/advancer_test.go b/internal/advancer/advancer_test.go index 847af8405..f86433264 100644 --- a/internal/advancer/advancer_test.go +++ b/internal/advancer/advancer_test.go @@ -226,6 +226,10 @@ func (mock *MachinesMock) GetAdvanceMachine(app Address) (machines.AdvanceMachin return machine, exists } +func (m *MachinesMock) UpdateMachines(ctx context.Context) error { + return nil // FIXME +} + func (mock *MachinesMock) Apps() []Address { return []Address{} } diff --git a/internal/advancer/machines/machines.go b/internal/advancer/machines/machines.go index cd9c95bd6..97eafedb5 100644 --- a/internal/advancer/machines/machines.go +++ b/internal/advancer/machines/machines.go @@ -41,8 +41,10 @@ type InspectMachine interface { // Machines is a thread-safe type that manages the pool of cartesi machines being used by the node. // It contains a map of applications to machines. type Machines struct { - mutex sync.RWMutex - machines map[Address]*nm.NodeMachine + mutex sync.RWMutex + machines map[Address]*nm.NodeMachine + repository Repository + verbosity cm.ServerVerbosity } // Load initializes the cartesi machines. @@ -79,7 +81,39 @@ func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (* machines[config.AppAddress] = machine } - return &Machines{machines: machines}, errs + return &Machines{machines: machines, repository: repo, verbosity: verbosity}, errs +} + +func (m *Machines) UpdateMachines(ctx context.Context) error { + configs, err := m.repository.GetMachineConfigurations(ctx) + if err != nil { + return err + } + + for _, config := range configs { + if m.Exists(config.AppAddress) { + continue + } + + machine, err := createMachine(ctx, m.verbosity, config) + if err != nil { + slog.Error("advancer: Failed to create machine", "app", config.AppAddress, "error", err) + continue + } + + err = catchUp(ctx, m.repository, config.AppAddress, machine, config.ProcessedInputs) + if err != nil { + slog.Error("Failed to sync the machine", "app", config.AppAddress, "error", err) + machine.Close() + continue + } + + m.Add(config.AppAddress, machine) + } + + m.RemoveAbsent(configs) + + return nil } // GetAdvanceMachine gets the machine associated with the application from the map. @@ -107,6 +141,30 @@ func (m *Machines) Add(app Address, machine *nm.NodeMachine) bool { } } +func (m *Machines) Exists(app Address) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + + _, exists := m.machines[app] + return exists +} + +func (m *Machines) RemoveAbsent(configs []*MachineConfig) { + m.mutex.Lock() + defer m.mutex.Unlock() + configMap := make(map[Address]bool) + for _, config := range configs { + configMap[config.AppAddress] = true + } + for address, machine := range m.machines { + if !configMap[address] { + slog.Info("advancer: Application was disabled, shutting down machine", "application", address) + machine.Close() + delete(m.machines, address) + } + } +} + // Delete deletes an application from the map. // It returns the associated machine, if any. func (m *Machines) Delete(app Address) *nm.NodeMachine { @@ -179,7 +237,7 @@ func createMachine(ctx context.Context, return nil, err } - slog.Debug("advancer: loading machine on server", "application", config.AppAddress, + slog.Info("advancer: loading machine on server", "application", config.AppAddress, "remote-machine", address, "template-path", config.SnapshotPath) // Creates a CartesiMachine from the snapshot. runtimeConfig := &emulator.MachineRuntimeConfig{} diff --git a/internal/evmreader/evmreader.go b/internal/evmreader/evmreader.go index 5e5068503..5d3d633ea 100644 --- a/internal/evmreader/evmreader.go +++ b/internal/evmreader/evmreader.go @@ -114,6 +114,7 @@ type EvmReader struct { inputBoxDeploymentBlock uint64 defaultBlock DefaultBlock epochLengthCache map[Address]uint64 + hasEnabledApps bool } func (r *EvmReader) String() string { @@ -138,6 +139,7 @@ func NewEvmReader( inputBoxDeploymentBlock: inputBoxDeploymentBlock, defaultBlock: defaultBlock, contractFactory: contractFactory, + hasEnabledApps: true, } } @@ -181,6 +183,7 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} // Every time a new block arrives slog.Debug("evmreader: New block header received", "blockNumber", header.Number, "blockHash", header.Hash()) + slog.Debug("evmreader: Retrieving enabled applications") // Get All Applications runningApps, err := r.repository.GetAllRunningApplications(ctx) if err != nil { @@ -191,6 +194,18 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} continue } + if len(runningApps) == 0 { + if r.hasEnabledApps { + slog.Info("evmreader: No registered applications enabled") + } + r.hasEnabledApps = false + continue + } + if !r.hasEnabledApps { + slog.Info("evmreader: Found enabled applications") + } + r.hasEnabledApps = true + // Build Contracts var apps []application for _, app := range runningApps { diff --git a/internal/repository/base.go b/internal/repository/base.go index 7d97a6432..b9106669a 100644 --- a/internal/repository/base.go +++ b/internal/repository/base.go @@ -493,6 +493,39 @@ func (pg *Database) GetApplication( return &app, nil } +func (pg *Database) UpdateApplicationStatus( + ctx context.Context, + appAddressKey Address, + newStatus ApplicationStatus, +) error { + query := ` + UPDATE + application + SET + status = @newStatus + WHERE + contract_address = @contractAddress` + + args := pgx.NamedArgs{ + "contractAddress": appAddressKey, + "newStatus": newStatus, + } + + commandTag, err := pg.db.Exec(ctx, query, args) + if err != nil { + return fmt.Errorf("UpdateApplicationStatus Exec failed: %w", err) + } + + if commandTag.RowsAffected() == 0 { + slog.Debug("UpdateApplicationStatus affected no rows", + "service", "repository", + "app", appAddressKey) + return fmt.Errorf("no application found with contract address: %s", appAddressKey) + } + + return nil +} + func (pg *Database) GetEpochs(ctx context.Context, application Address) ([]Epoch, error) { query := ` SELECT