Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist Auctioneer Bid DB to S3 #2745

Merged
merged 14 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/autonomous-auctioneer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func (c *AutonomousAuctioneerConfig) GetReloadInterval() time.Duration {
}

func (c *AutonomousAuctioneerConfig) Validate() error {
if err := c.AuctioneerServer.S3Storage.Validate(); err != nil {
return err
}
return nil
}

Expand Down
40 changes: 6 additions & 34 deletions das/s3_storage_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,22 @@ import (
"bytes"
"context"
"fmt"
"io"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/util/pretty"
"github.com/offchainlabs/nitro/util/s3client"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"

flag "github.com/spf13/pflag"
)

type S3Uploader interface {
Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error)
}

type S3Downloader interface {
Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error)
}

type S3StorageServiceConfig struct {
Enable bool `koanf:"enable"`
AccessKey string `koanf:"access-key"`
Expand All @@ -56,48 +46,30 @@ func S3ConfigAddOptions(prefix string, f *flag.FlagSet) {
}

type S3StorageService struct {
client *s3.Client
client s3client.FullClient
bucket string
objectPrefix string
uploader S3Uploader
downloader S3Downloader
discardAfterTimeout bool
}

func NewS3StorageService(config S3StorageServiceConfig) (StorageService, error) {
client, err := buildS3Client(config.AccessKey, config.SecretKey, config.Region)
client, err := s3client.NewS3FullClient(config.AccessKey, config.SecretKey, config.Region)
if err != nil {
return nil, err
}
return &S3StorageService{
client: client,
bucket: config.Bucket,
objectPrefix: config.ObjectPrefix,
uploader: manager.NewUploader(client),
downloader: manager.NewDownloader(client),
discardAfterTimeout: config.DiscardAfterTimeout,
}, nil
}

func buildS3Client(accessKey, secretKey, region string) (*s3.Client, error) {
cfg, err := awsConfig.LoadDefaultConfig(context.TODO(), awsConfig.WithRegion(region), func(options *awsConfig.LoadOptions) error {
// remain backward compatible with accessKey and secretKey credentials provided via cli flags
if accessKey != "" && secretKey != "" {
options.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
}
return nil
})
if err != nil {
return nil, err
}
return s3.NewFromConfig(cfg), nil
}

func (s3s *S3StorageService) GetByHash(ctx context.Context, key common.Hash) ([]byte, error) {
log.Trace("das.S3StorageService.GetByHash", "key", pretty.PrettyHash(key), "this", s3s)

buf := manager.NewWriteAtBuffer([]byte{})
_, err := s3s.downloader.Download(ctx, buf, &s3.GetObjectInput{
_, err := s3s.client.Download(ctx, buf, &s3.GetObjectInput{
Bucket: aws.String(s3s.bucket),
Key: aws.String(s3s.objectPrefix + EncodeStorageServiceKey(key)),
})
Expand All @@ -114,7 +86,7 @@ func (s3s *S3StorageService) Put(ctx context.Context, value []byte, timeout uint
expires := time.Unix(int64(timeout), 0)
putObjectInput.Expires = &expires
}
_, err := s3s.uploader.Upload(ctx, &putObjectInput)
_, err := s3s.client.Upload(ctx, &putObjectInput)
if err != nil {
log.Error("das.S3StorageService.Store", "err", err)
}
Expand All @@ -141,6 +113,6 @@ func (s3s *S3StorageService) String() string {
}

func (s3s *S3StorageService) HealthCheck(ctx context.Context) error {
_, err := s3s.client.HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(s3s.bucket)})
_, err := s3s.client.Client().HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(s3s.bucket)})
return err
}
21 changes: 11 additions & 10 deletions das/s3_storage_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ import (
"github.com/offchainlabs/nitro/das/dastree"
)

type mockS3Uploader struct {
type mockS3FullClient struct {
mockStorageService StorageService
}

func (m *mockS3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) {
func (m *mockS3FullClient) Client() *s3.Client {
return nil
}

func (m *mockS3FullClient) Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) {
buf := new(bytes.Buffer)
_, err := buf.ReadFrom(input.Body)
if err != nil {
Expand All @@ -33,11 +37,7 @@ func (m *mockS3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, o
return nil, err
}

type mockS3Downloader struct {
mockStorageService StorageService
}

func (m *mockS3Downloader) Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error) {
func (m *mockS3FullClient) Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error) {
key, err := DecodeStorageServiceKey(*input.Key)
if err != nil {
return 0, err
Expand All @@ -56,10 +56,11 @@ func (m *mockS3Downloader) Download(ctx context.Context, w io.WriterAt, input *s

func NewTestS3StorageService(ctx context.Context, s3Config genericconf.S3Config) (StorageService, error) {
mockStorageService := NewMemoryBackedStorageService(ctx)
s3FullClient := &mockS3FullClient{mockStorageService}
return &S3StorageService{
bucket: s3Config.Bucket,
uploader: &mockS3Uploader{mockStorageService},
downloader: &mockS3Downloader{mockStorageService}}, nil
bucket: s3Config.Bucket,
client: s3FullClient,
}, nil
}

func TestS3StorageService(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions timeboost/auctioneer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ type AuctioneerServerConfig struct {
SequencerJWTPath string `koanf:"sequencer-jwt-path"`
AuctionContractAddress string `koanf:"auction-contract-address"`
DbDirectory string `koanf:"db-directory"`
S3Storage S3StorageServiceConfig `koanf:"s3-storage"`
}

var DefaultAuctioneerServerConfig = AuctioneerServerConfig{
Enable: true,
RedisURL: "",
ConsumerConfig: pubsub.DefaultConsumerConfig,
StreamTimeout: 10 * time.Minute,
S3Storage: DefaultS3StorageServiceConfig,
}

var TestAuctioneerServerConfig = AuctioneerServerConfig{
Expand All @@ -92,6 +94,7 @@ func AuctioneerServerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".sequencer-jwt-path", DefaultAuctioneerServerConfig.SequencerJWTPath, "sequencer jwt file path")
f.String(prefix+".auction-contract-address", DefaultAuctioneerServerConfig.AuctionContractAddress, "express lane auction contract address")
f.String(prefix+".db-directory", DefaultAuctioneerServerConfig.DbDirectory, "path to database directory for persisting validated bids in a sqlite file")
S3StorageServiceConfigAddOptions(prefix+".s3-storage", f)
}

// AuctioneerServer is a struct that represents an autonomous auctioneer.
Expand All @@ -112,6 +115,7 @@ type AuctioneerServer struct {
roundDuration time.Duration
streamTimeout time.Duration
database *SqliteDatabase
s3StorageService *S3StorageService
}

// NewAuctioneerServer creates a new autonomous auctioneer struct.
Expand All @@ -133,6 +137,13 @@ func NewAuctioneerServer(ctx context.Context, configFetcher AuctioneerServerConf
if err != nil {
return nil, err
}
var s3StorageService *S3StorageService
if cfg.S3Storage.Enable {
s3StorageService, err = NewS3StorageService(&cfg.S3Storage, database)
if err != nil {
return nil, err
}
}
auctionContractAddr := common.HexToAddress(cfg.AuctionContractAddress)
redisClient, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
Expand Down Expand Up @@ -194,6 +205,7 @@ func NewAuctioneerServer(ctx context.Context, configFetcher AuctioneerServerConf
chainId: chainId,
client: sequencerClient,
database: database,
s3StorageService: s3StorageService,
consumer: c,
auctionContract: auctionContract,
auctionContractAddr: auctionContractAddr,
Expand All @@ -207,6 +219,10 @@ func NewAuctioneerServer(ctx context.Context, configFetcher AuctioneerServerConf

func (a *AuctioneerServer) Start(ctx_in context.Context) {
a.StopWaiter.Start(ctx_in, a)
// Start S3 storage service to persist validated bids to s3
if a.s3StorageService != nil {
a.s3StorageService.Start(ctx_in)
}
// Channel that consumer uses to indicate its readiness.
readyStream := make(chan struct{}, 1)
a.consumer.Start(ctx_in)
Expand Down
30 changes: 30 additions & 0 deletions timeboost/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,36 @@ func (d *SqliteDatabase) InsertBid(b *ValidatedBid) error {
return nil
}

func (d *SqliteDatabase) GetBids(maxDbRows int) ([]*SqliteDatabaseBid, uint64, error) {
d.lock.Lock()
defer d.lock.Unlock()
var maxRound uint64
query := `SELECT MAX(Round) FROM Bids`
err := d.sqlDB.Get(&maxRound, query)
if err != nil {
return nil, 0, fmt.Errorf("failed to fetch maxRound from bids: %w", err)
}
var sqlDBbids []*SqliteDatabaseBid
if maxDbRows == 0 {
if err := d.sqlDB.Select(&sqlDBbids, "SELECT * FROM Bids WHERE Round < ? ORDER BY Round ASC", maxRound); err != nil {
return nil, 0, err
}
return sqlDBbids, maxRound, nil
}
if err := d.sqlDB.Select(&sqlDBbids, "SELECT * FROM Bids WHERE Round < ? ORDER BY Round ASC LIMIT ?", maxRound, maxDbRows); err != nil {
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
return nil, 0, err
}
// We should return contiguous set of bids
for i := len(sqlDBbids) - 1; i > 0; i-- {
if sqlDBbids[i].Round != sqlDBbids[i-1].Round {
return sqlDBbids[:i], sqlDBbids[i].Round, nil
}
}
// If we can't determine a contiguous set of bids, we abort and retry again.
// Saves us from cases where we sometime push same batch data twice
return nil, 0, nil
}

func (d *SqliteDatabase) DeleteBids(round uint64) error {
d.lock.Lock()
defer d.lock.Unlock()
Expand Down
12 changes: 1 addition & 11 deletions timeboost/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,6 @@ import (

func TestInsertAndFetchBids(t *testing.T) {
t.Parallel()
type DatabaseBid struct {
Id uint64 `db:"Id"`
ChainId string `db:"ChainId"`
Bidder string `db:"Bidder"`
ExpressLaneController string `db:"ExpressLaneController"`
AuctionContractAddress string `db:"AuctionContractAddress"`
Round uint64 `db:"Round"`
Amount string `db:"Amount"`
Signature string `db:"Signature"`
}

tmpDir, err := os.MkdirTemp("", "*")
require.NoError(t, err)
Expand Down Expand Up @@ -56,7 +46,7 @@ func TestInsertAndFetchBids(t *testing.T) {
for _, bid := range bids {
require.NoError(t, db.InsertBid(bid))
}
gotBids := make([]*DatabaseBid, 2)
gotBids := make([]*SqliteDatabaseBid, 2)
err = db.sqlDB.Select(&gotBids, "SELECT * FROM Bids ORDER BY Id")
require.NoError(t, err)
require.Equal(t, bids[0].Amount.String(), gotBids[0].Amount)
Expand Down
Loading