Skip to content

Commit

Permalink
fix: make the file writing to be idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 committed Aug 19, 2024
1 parent 7a35d28 commit 57dc34f
Showing 1 changed file with 40 additions and 28 deletions.
68 changes: 40 additions & 28 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ const (
BundleStatusCreatedOnChain = 2
BundleStatusSealedOnChain = 3

LoopSleepTime = 10 * time.Millisecond
BSCPauseTime = 3 * time.Second
LoopSleepTime = 10 * time.Millisecond
LoopErrorPauseTime = 2 * time.Second
BSCPauseTime = 3 * time.Second

ETHPauseTime = 90 * time.Second
RPCTimeout = 20 * time.Second
Expand Down Expand Up @@ -111,6 +112,7 @@ func (s *BlobSyncer) StartLoop() {
for range syncTicker.C {
if err = s.sync(); err != nil {
logging.Logger.Errorf("failed to sync, err=%s", err.Error())
time.Sleep(LoopErrorPauseTime)
}
}
}()
Expand All @@ -119,6 +121,7 @@ func (s *BlobSyncer) StartLoop() {
for range verifyTicket.C {
if err := s.verify(); err != nil {
logging.Logger.Errorf("failed to verify, err=%s", err.Error())
time.Sleep(LoopErrorPauseTime)
}
}
}()
Expand Down Expand Up @@ -197,27 +200,30 @@ func (s *BlobSyncer) sync() error {
if err != nil {
return err
}
var dbErr error

if isForkedBlock {
dbErr = s.blobDao.SaveBlockAndBlob(&db.Block{
err := s.blobDao.SaveBlockAndBlob(&db.Block{
Slot: blockID,
BundleName: bundleName,
}, nil)
if err != nil {
logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, err.Error())
return err
}
} else {
blockToSave, blobToSave, err := s.toBlockAndBlobs(block, sideCars, blockID, bundleName)
if err == nil {
err = s.blobDao.SaveBlockAndBlob(blockToSave, blobToSave)
if err == nil {
metrics.SyncedBlockIDGauge.Set(float64(blockID))
logging.Logger.Infof("saved block(block_id=%d) and blobs(num=%d) to DB \n", blockID, len(blobToSave))
}
if err != nil {
logging.Logger.Errorf("failed to convert to block and blobs, err=%s", err.Error())
return err
}
dbErr = err
}
if dbErr != nil {
logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, dbErr.Error())
return dbErr
if err = s.blobDao.SaveBlockAndBlob(blockToSave, blobToSave); err != nil {
logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, err.Error())
return err
}
logging.Logger.Infof("saved block(block_id=%d) and blobs(num=%d) to DB \n", blockID, len(blobToSave))
}
metrics.SyncedBlockIDGauge.Set(float64(blockID))
// update the block status to processed
if blockID == s.bundleDetail.finalizeBlockID {
// init next bundle
startBlockID := blockID + 1
Expand All @@ -240,12 +246,18 @@ func (s *BlobSyncer) process(bundleName string, blockID uint64, sidecars []*type
return err
}
}
if err = s.writeBlobToFile(blockID, bundleName, sidecars); err != nil {
return err
// for idempotent
_, err = os.Stat(s.getBundleDir(bundleName))
if !os.IsNotExist(err) {
if err = s.writeBlobToFile(blockID, bundleName, sidecars); err != nil {
return err
}
}
if blockID == s.bundleDetail.finalizeBlockID {
// this is idempotent
err = s.finalizeCurBundle(bundleName)
if err != nil {
logging.Logger.Errorf("failed to finalize bundle, bundle=%s, err=%s", bundleName, err.Error())
return err
}
logging.Logger.Infof("finalized bundle, bundle_name=%s, bucket_name=%s\n", bundleName, s.getBucketName())
Expand Down Expand Up @@ -298,14 +310,8 @@ func (s *BlobSyncer) finalizeBundle(bundleName, bundleDir, bundleFilePath string
return err
}
}
err = os.RemoveAll(bundleDir)
if err != nil {
return err
}
err = os.Remove(bundleFilePath)
if err != nil && !os.IsNotExist(err) {
return err
}
os.RemoveAll(bundleDir)
os.Remove(bundleFilePath)
return s.blobDao.UpdateBundleStatus(bundleName, db.Finalized)
}

Expand Down Expand Up @@ -395,7 +401,9 @@ func (s *BlobSyncer) toBlockAndBlobs(blockResp *structs.GetBlockV2Response, side
blobsReturn := make([]*db.Blob, 0)

populateBlobTxDetails := func(blockNum uint64) error {
elBlock, err := s.client.BlockByNumber(context.Background(), big.NewInt(int64(blockNum)))
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
elBlock, err := s.client.BlockByNumber(ctx, big.NewInt(int64(blockNum)))
if err != nil {
return fmt.Errorf("failed to get block at height %d, err=%s", blockNum, err.Error())
}
Expand All @@ -415,7 +423,9 @@ func (s *BlobSyncer) toBlockAndBlobs(blockResp *structs.GetBlockV2Response, side

switch {
case s.BSCChain():
header, err := s.client.GetBlockHeader(context.Background(), blockNumOrSlot)
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
header, err := s.client.GetBlockHeader(ctx, blockNumOrSlot)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -538,7 +548,9 @@ func (s *BlobSyncer) ETHChain() bool {

func (s *BlobSyncer) GetParams() (*cmn.VersionedParams, error) {
if s.params == nil {
params, err := s.chainClient.GetParams(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
params, err := s.chainClient.GetParams(ctx)
if err != nil {
logging.Logger.Errorf("failed to get params, err=%s", err.Error())
return nil, err
Expand Down

0 comments on commit 57dc34f

Please sign in to comment.