Skip to content

Commit

Permalink
feat: support dynamic loading of dapps
Browse files Browse the repository at this point in the history
  • Loading branch information
vfusco committed Nov 1, 2024
1 parent 6069de7 commit 5f5411c
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 4 deletions.
2 changes: 2 additions & 0 deletions cmd/cartesi-rollups-cli/root/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/app/deploy"
"github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/app/list"
"github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/app/register"
"github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/app/status"
"github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/common"
"github.com/spf13/cobra"
)
Expand All @@ -30,4 +31,5 @@ func init() {
Cmd.AddCommand(register.Cmd)
Cmd.AddCommand(deploy.Cmd)
Cmd.AddCommand(list.Cmd)
Cmd.AddCommand(status.Cmd)
}
89 changes: 89 additions & 0 deletions cmd/cartesi-rollups-cli/root/app/status/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package status

import (
"fmt"
"os"

cmdcommon "github.com/cartesi/rollups-node/cmd/cartesi-rollups-cli/root/common"
"github.com/cartesi/rollups-node/internal/model"
"github.com/ethereum/go-ethereum/common"
"github.com/spf13/cobra"
)

var Cmd = &cobra.Command{
Use: "status",
Short: "Display and set application status",
Example: examples,
Run: run,
}

const examples = `# Get application status:
cartesi-rollups-cli app status -a 0x000000000000000000000000000000000`

var (
enable bool
disable bool
)

func init() {
Cmd.Flags().StringVarP(
&cmdcommon.ApplicationAddress,
"address",
"a",
"",
"Application contract address",
)
cobra.CheckErr(Cmd.MarkFlagRequired("address"))

Cmd.Flags().BoolVarP(
&enable,
"enable",
"e",
false,
"Enable the application",
)

Cmd.Flags().BoolVarP(
&disable,
"disable",
"d",
false,
"Disable the application",
)

}

func run(cmd *cobra.Command, args []string) {
ctx := cmd.Context()

if cmdcommon.Database == nil {
panic("Database was not initialized")
}

address := common.HexToAddress(cmdcommon.ApplicationAddress)
application, err := cmdcommon.Database.GetApplication(ctx, address)
cobra.CheckErr(err)

if (!cmd.Flags().Changed("enable")) && (!cmd.Flags().Changed("disable")) {
fmt.Println(application.Status)
os.Exit(0)
}

if cmd.Flags().Changed("enable") && cmd.Flags().Changed("disable") {
fmt.Fprintln(os.Stderr, "Cannot enable and disable at the same time")
os.Exit(1)
}

status := model.ApplicationStatusRunning
if cmd.Flags().Changed("disable") {
status = model.ApplicationStatusNotRunning
}

err = cmdcommon.Database.UpdateApplicationStatus(ctx, address, status)
cobra.CheckErr(err)

fmt.Printf("Application status updated to %s\n", status)
}
7 changes: 7 additions & 0 deletions internal/advancer/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func (advancer *Advancer) Poller(pollingInterval time.Duration) (*poller.Poller,
// runs them through the cartesi machine,
// and updates the repository with the outputs.
func (advancer *Advancer) Step(ctx context.Context) error {
// Dynamically updates the list of machines
err := advancer.machines.UpdateMachines(ctx)
if err != nil {
return err
}

apps := advancer.machines.Apps()

// Gets the unprocessed inputs (of all apps) from the repository.
Expand Down Expand Up @@ -125,6 +131,7 @@ type Repository interface {

type Machines interface {
GetAdvanceMachine(app Address) (machines.AdvanceMachine, bool)
UpdateMachines(ctx context.Context) error
Apps() []Address
}

Expand Down
4 changes: 4 additions & 0 deletions internal/advancer/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ func (mock *MachinesMock) GetAdvanceMachine(app Address) (machines.AdvanceMachin
return machine, exists
}

func (m *MachinesMock) UpdateMachines(ctx context.Context) error {
return nil // FIXME
}

func (mock *MachinesMock) Apps() []Address {
return []Address{}
}
Expand Down
66 changes: 62 additions & 4 deletions internal/advancer/machines/machines.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ type InspectMachine interface {
// Machines is a thread-safe type that manages the pool of cartesi machines being used by the node.
// It contains a map of applications to machines.
type Machines struct {
mutex sync.RWMutex
machines map[Address]*nm.NodeMachine
mutex sync.RWMutex
machines map[Address]*nm.NodeMachine
repository Repository
verbosity cm.ServerVerbosity
}

// Load initializes the cartesi machines.
Expand Down Expand Up @@ -79,7 +81,39 @@ func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (*
machines[config.AppAddress] = machine
}

return &Machines{machines: machines}, errs
return &Machines{machines: machines, repository: repo, verbosity: verbosity}, errs
}

func (m *Machines) UpdateMachines(ctx context.Context) error {
configs, err := m.repository.GetMachineConfigurations(ctx)
if err != nil {
return err
}

for _, config := range configs {
if m.Exists(config.AppAddress) {
continue
}

machine, err := createMachine(ctx, m.verbosity, config)
if err != nil {
slog.Error("advancer: Failed to create machine", "app", config.AppAddress, "error", err)
continue
}

err = catchUp(ctx, m.repository, config.AppAddress, machine, config.ProcessedInputs)
if err != nil {
slog.Error("Failed to sync the machine", "app", config.AppAddress, "error", err)
machine.Close()
continue
}

m.Add(config.AppAddress, machine)
}

m.RemoveAbsent(configs)

return nil
}

// GetAdvanceMachine gets the machine associated with the application from the map.
Expand Down Expand Up @@ -107,6 +141,30 @@ func (m *Machines) Add(app Address, machine *nm.NodeMachine) bool {
}
}

func (m *Machines) Exists(app Address) bool {
m.mutex.Lock()
defer m.mutex.Unlock()

_, exists := m.machines[app]
return exists
}

func (m *Machines) RemoveAbsent(configs []*MachineConfig) {
m.mutex.Lock()
defer m.mutex.Unlock()
configMap := make(map[Address]bool)
for _, config := range configs {
configMap[config.AppAddress] = true
}
for address, machine := range m.machines {
if !configMap[address] {
slog.Info("advancer: Application was disabled, shutting down machine", "application", address)
machine.Close()
delete(m.machines, address)
}
}
}

// Delete deletes an application from the map.
// It returns the associated machine, if any.
func (m *Machines) Delete(app Address) *nm.NodeMachine {
Expand Down Expand Up @@ -179,7 +237,7 @@ func createMachine(ctx context.Context,
return nil, err
}

slog.Debug("advancer: loading machine on server", "application", config.AppAddress,
slog.Info("advancer: loading machine on server", "application", config.AppAddress,
"remote-machine", address, "template-path", config.SnapshotPath)
// Creates a CartesiMachine from the snapshot.
runtimeConfig := &emulator.MachineRuntimeConfig{}
Expand Down
15 changes: 15 additions & 0 deletions internal/evmreader/evmreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type EvmReader struct {
inputBoxDeploymentBlock uint64
defaultBlock DefaultBlock
epochLengthCache map[Address]uint64
hasEnabledApps bool
}

func (r *EvmReader) String() string {
Expand All @@ -138,6 +139,7 @@ func NewEvmReader(
inputBoxDeploymentBlock: inputBoxDeploymentBlock,
defaultBlock: defaultBlock,
contractFactory: contractFactory,
hasEnabledApps: true,
}
}

Expand Down Expand Up @@ -181,6 +183,7 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{}
// Every time a new block arrives
slog.Debug("evmreader: New block header received", "blockNumber", header.Number, "blockHash", header.Hash())

slog.Debug("evmreader: Retrieving enabled applications")
// Get All Applications
runningApps, err := r.repository.GetAllRunningApplications(ctx)
if err != nil {
Expand All @@ -191,6 +194,18 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{}
continue
}

if len(runningApps) == 0 {
if r.hasEnabledApps {
slog.Info("evmreader: No registered applications enabled")
}
r.hasEnabledApps = false
continue
}
if !r.hasEnabledApps {
slog.Info("evmreader: Found enabled applications")
}
r.hasEnabledApps = true

// Build Contracts
var apps []application
for _, app := range runningApps {
Expand Down
33 changes: 33 additions & 0 deletions internal/repository/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,39 @@ func (pg *Database) GetApplication(
return &app, nil
}

func (pg *Database) UpdateApplicationStatus(
ctx context.Context,
appAddressKey Address,
newStatus ApplicationStatus,
) error {
query := `
UPDATE
application
SET
status = @newStatus
WHERE
contract_address = @contractAddress`

args := pgx.NamedArgs{
"contractAddress": appAddressKey,
"newStatus": newStatus,
}

commandTag, err := pg.db.Exec(ctx, query, args)
if err != nil {
return fmt.Errorf("UpdateApplicationStatus Exec failed: %w", err)
}

if commandTag.RowsAffected() == 0 {
slog.Debug("UpdateApplicationStatus affected no rows",
"service", "repository",
"app", appAddressKey)
return fmt.Errorf("no application found with contract address: %s", appAddressKey)
}

return nil
}

func (pg *Database) GetEpochs(ctx context.Context, application Address) ([]Epoch, error) {
query := `
SELECT
Expand Down

0 comments on commit 5f5411c

Please sign in to comment.