Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: process fee vault withdrawals #24

Merged
merged 8 commits into from
Mar 7, 2024
113 changes: 8 additions & 105 deletions cmd/bot/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,19 @@ import (
"context"
"errors"
"fmt"
"math/big"
"net/http"
"strings"
"time"

"github.com/ethereum-optimism/optimism/indexer/config"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/urfave/cli/v2"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

func RunCommand(ctx *cli.Context) error {
Expand Down Expand Up @@ -52,11 +48,12 @@ func RunCommand(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("failed to connect database: %w", err)
}

err = db.AutoMigrate(&core.L2ScannedBlock{})
if err != nil {
return fmt.Errorf("failed to migrate l2_scanned_blocks: %w", err)
}
err = db.AutoMigrate(&core.BotDelegatedWithdrawal{})
err = db.AutoMigrate(&core.WithdrawalInitiatedLog{})
if err != nil {
return fmt.Errorf("failed to migrate withdrawals: %w", err)
}
Expand All @@ -78,7 +75,10 @@ func RunCommand(ctx *cli.Context) error {
}()

go core.StartMetrics(ctx.Context, &cfg, &l1Client.Client, db, logger)
go WatchBotDelegatedWithdrawals(ctx.Context, logger, db, l2Client, l2ScannedBlock, cfg)
go func() {
indexer := core.NewIndexer(logger, db, l2Client, cfg)
indexer.Start(ctx.Context, l2ScannedBlock)
}()
go ProcessBotDelegatedWithdrawals(ctx.Context, logger, db, l1Client, l2Client, cfg)

<-ctx.Context.Done()
Expand Down Expand Up @@ -130,7 +130,7 @@ func ProcessUnprovenBotDelegatedWithdrawals(ctx context.Context, log log.Logger,
processor := core.NewProcessor(log, l1Client, l2Client, cfg)
limit := 1000

unprovens := make([]core.BotDelegatedWithdrawal, 0)
unprovens := make([]core.WithdrawalInitiatedLog, 0)
result := db.Order("id asc").Where("proven_time IS NULL AND initiated_block_number <= ? AND failure_reason IS NULL", latestProposedNumber.Uint64()).Limit(limit).Find(&unprovens)
if result.Error != nil {
log.Error("failed to query withdrawals", "error", result.Error)
Expand Down Expand Up @@ -182,7 +182,7 @@ func ProcessUnfinalizedBotDelegatedWithdrawals(ctx context.Context, log log.Logg
now := time.Now()
maxProvenTime := now.Add(-time.Duration(cfg.ChallengeTimeWindow) * time.Second)

unfinalizeds := make([]core.BotDelegatedWithdrawal, 0)
unfinalizeds := make([]core.WithdrawalInitiatedLog, 0)
result := db.Order("id asc").Where("finalized_time IS NULL AND proven_time IS NOT NULL AND proven_time < ? AND failure_reason IS NULL", maxProvenTime).Limit(limit).Find(&unfinalizeds)
if result.Error != nil {
log.Error("failed to query withdrawals", "error", result.Error)
Expand Down Expand Up @@ -228,103 +228,6 @@ func ProcessUnfinalizedBotDelegatedWithdrawals(ctx context.Context, log log.Logg
}
}

// storeLogs stores the logs in the database
func storeLogs(db *gorm.DB, client *core.ClientExt, logs []types.Log) error {
// save all the logs in this range of blocks
for _, vLog := range logs {
header, err := client.HeaderByHash(context.Background(), vLog.BlockHash)
if err != nil {
return err
}

event := core.BotDelegatedWithdrawal{
TransactionHash: vLog.TxHash.Hex(),
LogIndex: int(vLog.Index),
InitiatedBlockNumber: int64(header.Number.Uint64()),
}

deduped := db.Clauses(
clause.OnConflict{DoNothing: true},
)
result := deduped.Create(&event)
if result.Error != nil {
return result.Error
}
}

return nil
}

// WatchBotDelegatedWithdrawals watches for new bot-delegated withdrawals and stores them in the database.
func WatchBotDelegatedWithdrawals(ctx context.Context, log log.Logger, db *gorm.DB, client *core.ClientExt, l2StartingBlock *core.L2ScannedBlock, cfg core.Config) {
timer := time.NewTimer(0)
fromBlockNumber := big.NewInt(l2StartingBlock.Number)

for {
select {
case <-ctx.Done():
return
case <-timer.C:
timer.Reset(time.Second)
}

toBlockNumber := new(big.Int).Add(fromBlockNumber, big.NewInt(cfg.L2StandardBridgeBot.LogFilterBlockRange))
finalizedHeader, err := client.GetHeaderByTag(context.Background(), "finalized")
if err != nil {
log.Error("call eth_blockNumber", "error", err)
continue
}
if toBlockNumber.Uint64() > finalizedHeader.Number.Uint64() {
toBlockNumber = finalizedHeader.Number
}

if fromBlockNumber.Uint64() > toBlockNumber.Uint64() {
timer.Reset(5 * time.Second)
continue
}

log.Info("Fetching logs from blocks", "fromBlock", fromBlockNumber, "toBlock", toBlockNumber)
logs, err := getLogs(client, fromBlockNumber, toBlockNumber, common.HexToAddress(cfg.L2StandardBridgeBot.ContractAddress), core.WithdrawToEventSig())
if err != nil {
log.Error("eth_getLogs", "error", err)
continue
}

if len(logs) != 0 {
for _, vlog := range logs {
log.Info("fetched bot-delegated withdrawal", "blockNumber", vlog.BlockNumber, "transactionHash", vlog.TxHash.Hex())
}

err = storeLogs(db, client, logs)
if err != nil {
log.Error("storeLogs", "error", err)
continue
}
}

l2StartingBlock.Number = toBlockNumber.Int64()
result := db.Save(l2StartingBlock)
if result.Error != nil {
log.Error("update l2_scanned_blocks", "error", result.Error)
}

fromBlockNumber = new(big.Int).Add(toBlockNumber, big.NewInt(1))
}
}

// getLogs returns the logs for a given contract address and block range
func getLogs(client *core.ClientExt, fromBlock *big.Int, toBlock *big.Int, contractAddress common.Address, eventSig common.Hash) ([]types.Log, error) {
query := ethereum.FilterQuery{
FromBlock: fromBlock,
ToBlock: toBlock,
Addresses: []common.Address{
contractAddress,
},
Topics: [][]common.Hash{[]common.Hash{eventSig}},
}
return client.FilterLogs(context.Background(), query)
}

// connect connects to the database
func connect(log log.Logger, dbConfig config.DBConfig) (*gorm.DB, error) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8&parseTime=True&loc=Local", dbConfig.User, dbConfig.Password, dbConfig.Host, dbConfig.Port, dbConfig.Name)
Expand Down
Loading
Loading