Skip to content

Commit

Permalink
enhancement
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 committed Apr 17, 2024
1 parent 1e47fc8 commit acd4524
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 16 deletions.
2 changes: 1 addition & 1 deletion config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ const (
EnvVarDBUserPass = "DB_PASSWORD"
EnvVarPrivateKey = "PRIVATE_KEY"

DefaultCreateBundleSlotInterval = 10
DefaultCreateBundleSlotInterval = 20
)
1 change: 1 addition & 0 deletions db/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ type Status int
const (
Processed Status = 0
Verified Status = 1 // each block's blobs will be verified by the post-verification process
Skipped Status = 2
)

type Block struct {
Expand Down
1 change: 1 addition & 0 deletions db/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
Finalizing InnerBundleStatus = 0
Finalized InnerBundleStatus = 1 // when a bundle is uploaded to bundle service, its status will be Finalized
Sealed InnerBundleStatus = 2 // todo The post verification process should check if a bundle is indeed sealed onchain
Deprecated InnerBundleStatus = 3
)

type Bundle struct {
Expand Down
18 changes: 13 additions & 5 deletions db/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type BlockDB interface {
GetBlockByRoot(root string) (*Block, error)
GetLatestProcessedBlock() (*Block, error)
GetEarliestUnverifiedBlock() (*Block, error)
UpdateBlockToVerifiedStatus(slot uint64) error
UpdateBlockStatus(slot uint64, status Status) error
UpdateBlocksStatus(startSlot, endSlot uint64, status Status) error
}

func (d *BlobSvcDB) GetBlock(slot uint64) (*Block, error) {
Expand Down Expand Up @@ -65,10 +66,17 @@ func (d *BlobSvcDB) GetEarliestUnverifiedBlock() (*Block, error) {
return &block, nil
}

func (d *BlobSvcDB) UpdateBlockToVerifiedStatus(slot uint64) error {
func (d *BlobSvcDB) UpdateBlockStatus(slot uint64, status Status) error {
return d.db.Transaction(func(dbTx *gorm.DB) error {
return dbTx.Model(Block{}).Where("slot = ?", slot).Updates(
Block{Status: Verified}).Error
Block{Status: status}).Error
})
}

func (d *BlobSvcDB) UpdateBlocksStatus(startSlot, endSlot uint64, status Status) error {
return d.db.Transaction(func(dbTx *gorm.DB) error {
return dbTx.Model(Block{}).Where("slot >= ? and slot <= ?", startSlot, endSlot).Updates(
Block{Status: status}).Error
})
}

Expand Down Expand Up @@ -138,12 +146,12 @@ func (d *BlobSvcDB) UpdateBundleStatus(bundleName string, status InnerBundleStat
func (d *BlobSvcDB) SaveBlockAndBlob(block *Block, blobs []*Blob) error {
return d.db.Transaction(func(dbTx *gorm.DB) error {
err := dbTx.Save(block).Error
if err != nil && MysqlErrCode(err) == ErrDuplicateEntryCode {
if err != nil && MysqlErrCode(err) != ErrDuplicateEntryCode {
return err
}
if len(blobs) != 0 {
err = dbTx.Save(blobs).Error
if err != nil && MysqlErrCode(err) == ErrDuplicateEntryCode {
if err != nil && MysqlErrCode(err) != ErrDuplicateEntryCode {
return err
}
}
Expand Down
19 changes: 18 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (
"strings"
"time"

"gorm.io/gorm"

"github.com/ethereum/go-ethereum/common/hexutil"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
v1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"gorm.io/gorm"

"github.com/bnb-chain/blob-syncer/config"
"github.com/bnb-chain/blob-syncer/db"
Expand Down Expand Up @@ -323,6 +324,22 @@ func (s *BlobSyncer) LoadProgressAndResume(nextSlot uint64) error {
if err != nil {
return err
}

// might no longer need to process the bundle even-thought it is not finalized if the user set the config to skip it.
if nextSlot > endSlot {
err = s.blobDao.UpdateBlocksStatus(startSlot, endSlot, db.Skipped)
if err != nil {
logging.Logger.Errorf("failed to update blocks status, startSlot=%d, endSlot=%d", startSlot, endSlot)
return err
}
logging.Logger.Infof("the config slot number %d is larger than the recorded bundle end slot %d, will resume from the config slot", nextSlot, endSlot)
if err = s.blobDao.UpdateBundleStatus(finalizingBundle.Name, db.Deprecated); err != nil {
return err
}
startSlot = nextSlot
endSlot = nextSlot + s.getCreateBundleSlotInterval() - 1
}

}
s.bundleDetail = &curBundleDetail{
name: types.GetBundleName(startSlot, endSlot),
Expand Down
40 changes: 31 additions & 9 deletions syncer/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"path/filepath"
"time"

"github.com/prysmaticlabs/prysm/v5/api/server/structs"
"gorm.io/gorm"

"github.com/prysmaticlabs/prysm/v5/api/server/structs"

"github.com/bnb-chain/blob-syncer/db"
"github.com/bnb-chain/blob-syncer/external"
"github.com/bnb-chain/blob-syncer/logging"
Expand Down Expand Up @@ -44,7 +45,7 @@ func (s *BlobSyncer) verify() error {
}
verifyBlockSlot := verifyBlock.Slot
if verifyBlock.BlobCount == 0 {
if err = s.blobDao.UpdateBlockToVerifiedStatus(verifyBlockSlot); err != nil {
if err = s.blobDao.UpdateBlockStatus(verifyBlockSlot, db.Verified); err != nil {
logging.Logger.Errorf("failed to update block status, slot=%d err=%s", verifyBlockSlot, err.Error())
return err
}
Expand Down Expand Up @@ -100,8 +101,8 @@ func (s *BlobSyncer) verify() error {
}
return err
}
if err = s.blobDao.UpdateBlockToVerifiedStatus(verifyBlockSlot); err != nil {
logging.Logger.Errorf("failed to update block status, slot=%d err=%s", verifyBlockSlot, err.Error())
if err = s.blobDao.UpdateBlockStatus(verifyBlockSlot, db.Verified); err != nil {
logging.Logger.Errorf("failed to update block status to verified, slot=%d err=%s", verifyBlockSlot, err.Error())
return err
}
metrics.VerifiedSlotGauge.Set(float64(verifyBlockSlot))
Expand Down Expand Up @@ -177,6 +178,9 @@ func (s *BlobSyncer) verifyBlobAtSlot(slot uint64, sidecars []*structs.Sidecar,
}

func (s *BlobSyncer) reUploadBundle(bundleName string) error {
if err := s.blobDao.UpdateBundleStatus(bundleName, db.Deprecated); err != nil {
return err
}
newBundleName := bundleName + "_calibrated_" + util.Int64ToString(time.Now().Unix())
startSlot, endSlot, err := types.ParseBundleName(bundleName)
if err != nil {
Expand All @@ -190,12 +194,13 @@ func (s *BlobSyncer) reUploadBundle(bundleName string) error {
}
}
if err = s.blobDao.CreateBundle(&db.Bundle{
Name: newBundleName,
Status: db.Finalizing,
Name: newBundleName,
Status: db.Finalizing,
Calibrated: true,
}); err != nil {
return err
}
for slot := startSlot; slot < endSlot; slot++ {
for slot := startSlot; slot <= endSlot; slot++ {
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
sideCars, err := s.ethClients.BeaconClient.GetBlob(ctx, slot)
Expand All @@ -206,22 +211,39 @@ func (s *BlobSyncer) reUploadBundle(bundleName string) error {
return err
}
block, err := s.ethClients.BeaconClient.GetBlock(ctx, slot)
if err != nil {
if err == external.ErrBlockNotFound {
continue
}
return err
}
blockMeta, err := s.blobDao.GetBlock(slot)
if err != nil {
return err
}
blobMetas, err := s.blobDao.GetBlobBySlot(slot)
if err != nil {
return err
}
blockToSave, blobToSave, err := s.ToBlockAndBlobs(block, sideCars, slot, newBundleName)
if err != nil {
return err
}
blockToSave.Id = blockMeta.Id
for i, preBlob := range blobMetas {
if i < len(blobToSave) {
blobToSave[i].Id = preBlob.Id
}
}
err = s.blobDao.SaveBlockAndBlob(blockToSave, blobToSave)
if err != nil {
logging.Logger.Errorf("failed to save block(h=%d) and Blob(count=%d), err=%s", blockToSave.Slot, len(blobToSave), err.Error())
return err
}
logging.Logger.Infof("save calibrated block(slot=%d) and blobs(num=%d) to DB \n", slot, len(blobToSave))
}

if err := s.finalizeBundle(newBundleName, s.getBundleDir(newBundleName), s.getBundleFilePath(newBundleName)); err != nil {
if err = s.finalizeBundle(newBundleName, s.getBundleDir(newBundleName), s.getBundleFilePath(newBundleName)); err != nil {
logging.Logger.Errorf("failed to finalized bundle, name=%s, err=%s", newBundleName, err.Error())
return err
}
return nil
Expand Down

0 comments on commit acd4524

Please sign in to comment.