Skip to content

Commit

Permalink
resolve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
liam.lai committed Dec 10, 2024
1 parent d444487 commit 731f34a
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 7 deletions.
11 changes: 9 additions & 2 deletions consensus/XDPoS/XDPoS.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
const (
ExtraFieldCheck = true
SkipExtraFieldCheck = false
newRoundChanSize = 1
)

func (x *XDPoS) SigHash(header *types.Header) (hash common.Hash) {
Expand All @@ -64,6 +65,8 @@ type XDPoS struct {
// Share Channel
MinePeriodCh chan int // Miner wait Period Channel

NewRoundCh chan types.Round // Miner use this channel to trigger worker to commitNewWork

// Trading and lending service
GetXDCXService func() utils.TradingService
GetLendingService func() utils.LendingService
Expand Down Expand Up @@ -104,6 +107,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database) *XDPoS {
log.Info("xdc config loading", "v2 config", config.V2)

minePeriodCh := make(chan int)
newRoundCh := make(chan types.Round, newRoundChanSize)

// Allocate the snapshot caches and create the engine
signingTxsCache, _ := lru.New(utils.BlockSignersCacheLimit)
Expand All @@ -113,10 +117,11 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database) *XDPoS {
db: db,

MinePeriodCh: minePeriodCh,
NewRoundCh: newRoundCh,

signingTxsCache: signingTxsCache,
EngineV1: engine_v1.New(chainConfig, db),
EngineV2: engine_v2.New(chainConfig, db, minePeriodCh),
EngineV2: engine_v2.New(chainConfig, db, minePeriodCh, newRoundCh),
}
}

Expand All @@ -131,6 +136,7 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS {
}

minePeriodCh := make(chan int)
newRoundCh := make(chan types.Round, newRoundChanSize)

// Allocate the snapshot caches and create the engine
signingTxsCache, _ := lru.New(utils.BlockSignersCacheLimit)
Expand All @@ -140,13 +146,14 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS {
db: db,

MinePeriodCh: minePeriodCh,
NewRoundCh: newRoundCh,

GetXDCXService: func() utils.TradingService { return nil },
GetLendingService: func() utils.LendingService { return nil },

signingTxsCache: signingTxsCache,
EngineV1: engine_v1.NewFaker(db, chainConfig),
EngineV2: engine_v2.New(chainConfig, db, minePeriodCh),
EngineV2: engine_v2.New(chainConfig, db, minePeriodCh, newRoundCh),
}
return fakeEngine
}
Expand Down
11 changes: 10 additions & 1 deletion consensus/XDPoS/engines/engine_v2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type XDPoS_v2 struct {

BroadcastCh chan interface{}
minePeriodCh chan int
newRoundCh chan types.Round

timeoutWorker *countdown.CountdownTimer // Timer to generate broadcast timeout msg if threashold reached
timeoutCount int // number of timeout being sent
Expand All @@ -71,7 +72,7 @@ type XDPoS_v2 struct {
votePoolCollectionTime time.Time
}

func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan int) *XDPoS_v2 {
func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan int, newRoundCh chan types.Round) *XDPoS_v2 {
config := chainConfig.XDPoS
// Setup timeoutTimer
duration := time.Duration(config.V2.CurrentConfig.TimeoutPeriod) * time.Second
Expand Down Expand Up @@ -100,6 +101,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan i
timeoutWorker: timeoutTimer,
BroadcastCh: make(chan interface{}),
minePeriodCh: minePeriodCh,
newRoundCh: newRoundCh,

round2epochBlockInfo: round2epochBlockInfo,

Expand Down Expand Up @@ -902,6 +904,7 @@ func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, incomingQuo
1. Set currentRound = QC round + 1 (or TC round +1)
2. Reset timer
3. Reset vote and timeout Pools
4. Send signal to miner
*/
func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round types.Round) {
log.Info("[setNewRound] new round and reset pools and workers", "round", round)
Expand All @@ -911,6 +914,12 @@ func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round typ
x.timeoutPool.Clear()
// don't need to clean vote pool, we have other process to clean and it's not good to clean here, some edge case may break
// for example round gets bump during collecting vote, so we have to keep vote.

// send signal to newRoundCh, but if full don't send
select {
case x.newRoundCh <- round:
default:
}
}

func (x *XDPoS_v2) broadcastToBftChannel(msg interface{}) {
Expand Down
44 changes: 40 additions & 4 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -116,7 +117,9 @@ type worker struct {
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
chainSideSub event.Subscription
wg sync.WaitGroup
resetCh chan time.Duration // Channel to request timer resets

wg sync.WaitGroup

agents map[Agent]struct{}
recv chan *Result
Expand Down Expand Up @@ -158,6 +161,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
resetCh: make(chan time.Duration, 1),
chainDb: eth.ChainDb(),
recv: make(chan *Result, resultQueueSize),
chain: eth.BlockChain(),
Expand Down Expand Up @@ -273,6 +277,7 @@ func (w *worker) update() {
minePeriod := 2
MinePeriodCh := w.engine.(*XDPoS.XDPoS).MinePeriodCh
defer close(MinePeriodCh)
NewRoundCh := w.engine.(*XDPoS.XDPoS).NewRoundCh

timeout := time.NewTimer(time.Duration(minePeriod) * time.Second)
c := make(chan struct{})
Expand All @@ -283,6 +288,16 @@ func (w *worker) update() {
for {
// A real event arrived, process interesting content
select {
case d := <-w.resetCh:
// Reset the timer to the new duration.
if !timeout.Stop() {
// Drain the timer channel if it had already expired.
select {
case <-timeout.C:
default:
}
}
timeout.Reset(d)
case <-timeout.C:
c <- struct{}{}
case <-finish:
Expand All @@ -296,18 +311,26 @@ func (w *worker) update() {
case v := <-MinePeriodCh:
log.Info("[worker] update wait period", "period", v)
minePeriod = v
timeout.Reset(time.Duration(minePeriod) * time.Second)
w.resetCh <- time.Duration(minePeriod) * time.Second

case <-c:
if atomic.LoadInt32(&w.mining) == 1 {
w.commitNewWork()
}
timeout.Reset(time.Duration(minePeriod) * time.Second)
resetTime := getResetTime(w.chain, minePeriod)
w.resetCh <- resetTime

// Handle ChainHeadEvent
case <-w.chainHeadCh:
w.commitNewWork()
timeout.Reset(time.Duration(minePeriod) * time.Second)
resetTime := getResetTime(w.chain, minePeriod)
w.resetCh <- resetTime

// Handle new round
case <-NewRoundCh:
w.commitNewWork()
resetTime := getResetTime(w.chain, minePeriod)
w.resetCh <- resetTime

// Handle ChainSideEvent
case <-w.chainSideCh:
Expand Down Expand Up @@ -354,6 +377,19 @@ func (w *worker) update() {
}
}

func getResetTime(chain *core.BlockChain, minePeriod int) time.Duration {
minePeriodDuration := time.Duration(minePeriod) * time.Second
currentBlockTime := chain.CurrentBlock().Time().Int64()
nowTime := time.Now().UnixMilli()
resetTime := time.Duration(currentBlockTime)*time.Second + minePeriodDuration - time.Duration(nowTime)*time.Millisecond
// in case the current block time is not very accurate
if resetTime > minePeriodDuration || resetTime <= 0 {
resetTime = minePeriodDuration
}
log.Debug("[update] Miner worker timer reset", "resetMilliseconds", resetTime.Milliseconds(), "minePeriodSec", minePeriod, "currentBlockTimeSec", fmt.Sprintf("%d", currentBlockTime), "currentSystemTimeSec", fmt.Sprintf("%d.%03d", nowTime/1000, nowTime%1000))
return resetTime
}

func (w *worker) wait() {
for {
mustCommitNewWork := true
Expand Down

0 comments on commit 731f34a

Please sign in to comment.