Skip to content

Commit

Permalink
fix: fix when the DB written fails, in-memory bundle detail inconsistent
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 committed Jul 4, 2024
1 parent f696c37 commit 4e2253f
Showing 1 changed file with 27 additions and 23 deletions.
50 changes: 27 additions & 23 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syncer
import (
"context"
"encoding/hex"
"errors"
"fmt"
"math/big"
"os"
Expand All @@ -13,6 +14,7 @@ import (

"gorm.io/gorm"

"github.com/bnb-chain/blob-hub/metrics"
"github.com/ethereum/go-ethereum/common/hexutil"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
Expand All @@ -26,7 +28,6 @@ import (
"github.com/bnb-chain/blob-hub/external/cmn"
"github.com/bnb-chain/blob-hub/external/eth"
"github.com/bnb-chain/blob-hub/logging"
"github.com/bnb-chain/blob-hub/metrics"
"github.com/bnb-chain/blob-hub/types"
"github.com/bnb-chain/blob-hub/util"
)
Expand Down Expand Up @@ -198,26 +199,37 @@ func (s *BlobSyncer) sync() error {
if err != nil {
return err
}

var dbErr error
if isForkedBlock {
return s.blobDao.SaveBlockAndBlob(&db.Block{
dbErr = s.blobDao.SaveBlockAndBlob(&db.Block{
Slot: blockID,
BundleName: bundleName,
}, nil)
} else {
blockToSave, blobToSave, err := s.toBlockAndBlobs(block, sideCars, blockID, bundleName)
if err == nil {
err = s.blobDao.SaveBlockAndBlob(blockToSave, blobToSave)
if err == nil {
metrics.SyncedBlockIDGauge.Set(float64(blockID))
logging.Logger.Infof("saved block(block_id=%d) and blobs(num=%d) to DB \n", blockID, len(blobToSave))
}
}
dbErr = err
}

blockToSave, blobToSave, err := s.toBlockAndBlobs(block, sideCars, blockID, bundleName)
if err != nil {
return err
if dbErr != nil {
logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, err.Error())
return dbErr
}

err = s.blobDao.SaveBlockAndBlob(blockToSave, blobToSave)
if err != nil {
logging.Logger.Errorf("failed to save block(h=%d) and Blob(count=%d), err=%s", blockToSave.Slot, len(blobToSave), err.Error())
return err
if blockID == s.bundleDetail.finalizeBlockID {
// init next bundle
startBlockID := blockID + 1
endBlockID := blockID + s.getCreateBundleInterval()
s.bundleDetail = &curBundleDetail{
name: types.GetBundleName(startBlockID, endBlockID),
startBlockID: startBlockID,
finalizeBlockID: endBlockID,
}
}
metrics.SyncedBlockIDGauge.Set(float64(blockID))
logging.Logger.Infof("saved block(block_id=%d) and blobs(num=%d) to DB \n", blockID, len(blobToSave))
return nil
}

Expand All @@ -239,14 +251,6 @@ func (s *BlobSyncer) process(bundleName string, blockID uint64, sidecars []*type
return err
}
logging.Logger.Infof("finalized bundle, bundle_name=%s, bucket_name=%s\n", bundleName, s.getBucketName())
// init next bundle
startBlockID := blockID + 1
endBlockID := blockID + s.getCreateBundleInterval()
s.bundleDetail = &curBundleDetail{
name: types.GetBundleName(startBlockID, endBlockID),
startBlockID: startBlockID,
finalizeBlockID: endBlockID,
}
}
return nil
}
Expand Down Expand Up @@ -349,7 +353,7 @@ func (s *BlobSyncer) LoadProgressAndResume(nextBlockID uint64) error {
)
finalizingBundle, err := s.blobDao.GetLatestFinalizingBundle()
if err != nil {
if err != gorm.ErrRecordNotFound {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
// There is no pending(finalizing) bundle, start a new bundle. e.g. a bundle includes
Expand Down

0 comments on commit 4e2253f

Please sign in to comment.