Skip to content

Commit

Permalink
Refactor of phCache and head picking to always choose the greatest Td…
Browse files Browse the repository at this point in the history
… in zones:

This is a departure from the top-down picking originally proposed by HLCR. Only zones
have sufficient information to make decisions in which block to keep mining.  The
hierarchical constraint is now statistically enforced by setting subordinate Tds to
dominant values on coincident blocks.  Note that this is a statistical enforcement
and not a strict enforcement as previously proposed. This appears to create a much
more robust head pick that never stalls or seperates.

Coordinate updates coerce the ph pick to the result of the coordinate coincident
dom head choice.  The dom and its head choice is neccassarily the correct choice
because the the dom is always shared.

Renamed phHashKey, added deepcopies around phcache writes, combined writePh and pickPh into writePhHeaderAndPickPhHead
  • Loading branch information
kiltsonfire authored and gameofpointers committed Feb 14, 2023
1 parent bea8a40 commit 8e832c9
Showing 1 changed file with 57 additions and 88 deletions.
145 changes: 57 additions & 88 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ const (
maxPendingEtxBlocks = 256
pendingHeaderCacheLimit = 500
pendingHeaderGCTime = 5

// Termini Index reference to the index of Termini struct that has the
// previous coincident block hash
terminiIndex = 3
TerminusIndex = 3
)

type Slice struct {
Expand Down Expand Up @@ -58,8 +55,8 @@ type Slice struct {

phCachemu sync.RWMutex

pendingHeaderHeadHash common.Hash
phCache map[common.Hash]types.PendingHeader
bestPhKey common.Hash
phCache map[common.Hash]types.PendingHeader

validator Validator // Block and state validator interface
}
Expand Down Expand Up @@ -113,14 +110,15 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, isLocal

// Append takes a proposed header and constructs a local block and attempts to hierarchically append it to the block graph.
// If this is called from a dominant context a domTerminus must be provided else a common.Hash{} should be used and domOrigin should be set to true.
func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, domTerminus common.Hash, td *big.Int, domOrigin bool, reorg bool, newInboundEtxs types.Transactions) ([]types.Transactions, error) {
func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, domTerminus common.Hash, domTd *big.Int, domOrigin bool, reorg bool, newInboundEtxs types.Transactions) ([]types.Transactions, error) {
// The compute and write of the phCache is split starting here so we need to get the lock
sl.phCachemu.Lock()
defer sl.phCachemu.Unlock()

log.Info("Starting slice append", "hash", header.Hash(), "number", header.NumberArray(), "location", header.Location(), "parent hash", header.ParentHash())

nodeCtx := common.NodeLocation.Context()
location := header.Location()
isDomCoincident := sl.engine.IsDomCoincident(header)

// Don't append the block which already exists in the database.
if sl.hc.HasHeader(header.Hash(), header.NumberU64()) {
Expand All @@ -146,20 +144,18 @@ func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, do
return nil, err
}

log.Info("Starting slice append", "hash", block.Hash(), "number", block.Header().NumberArray(), "location", block.Header().Location(), "parent hash", block.ParentHash())

batch := sl.sliceDb.NewBatch()

// Run Previous Coincident Reference Check (PCRC)
domTerminus, newTermini, err := sl.pcrc(batch, block.Header(), domTerminus)
domTerminus, newTermini, err := sl.pcrc(batch, block.Header(), domTerminus, domOrigin)
if err != nil {
return nil, err
}

// If this was a coincident block, our dom will be passing us a set of newly confirmed ETXs
// If this is not a coincident block, we need to build up the list of confirmed ETXs using the subordinate manifest
subRollup := types.Transactions{}
if !isDomCoincident {
if !domOrigin {
newInboundEtxs, subRollup, err = sl.CollectNewlyConfirmedEtxs(block, block.Location())
if err != nil {
log.Debug("Error collecting newly confirmed etxs: ", "err", err)
Expand All @@ -179,12 +175,12 @@ func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, do
return nil, err
}

// CalcTd on the new block
td, err := sl.calcTd(block.Header(), domTd, domOrigin)
if err != nil {
return nil, err
}
if !domOrigin {
// CalcTd on the new block
td, err = sl.calcTd(block.Header())
if err != nil {
return nil, err
}
// HLCR
reorg = sl.hlcr(td)
}
Expand Down Expand Up @@ -253,11 +249,10 @@ func (sl *Slice) Append(header *types.Header, domPendingHeader *types.Header, do
return nil, err
}

sl.writeToPhCache(pendingHeaderWithTermini)
updateMiner := sl.pickPhCacheHead(reorg, pendingHeaderWithTermini, domOrigin)
sl.writeToPhCacheAndPickPhHead(reorg, pendingHeaderWithTermini, true, 3)

// Relay the new pendingHeader
sl.relayPh(pendingHeaderWithTermini, updateMiner, reorg, domOrigin, block.Location())
sl.relayPh(pendingHeaderWithTermini, reorg, domOrigin, block.Location())

log.Info("Appended new block", "number", block.Header().Number(), "hash", block.Hash(),
"uncles", len(block.Uncles()), "txs", len(block.Transactions()), "etxs", len(block.ExtTransactions()), "gas", block.GasUsed(),
Expand All @@ -279,13 +274,14 @@ func (sl *Slice) backfillPETXs(header *types.Header, subManifest types.BlockMani
}

// relayPh sends pendingHeaderWithTermini to subordinates
func (sl *Slice) relayPh(pendingHeaderWithTermini types.PendingHeader, updateMiner bool, reorg bool, domOrigin bool, location common.Location) {
func (sl *Slice) relayPh(pendingHeaderWithTermini types.PendingHeader, reorg bool, domOrigin bool, location common.Location) {
nodeCtx := common.NodeLocation.Context()

if nodeCtx == common.ZONE_CTX {
if updateMiner {
sl.phCache[sl.pendingHeaderHeadHash].Header.SetLocation(common.NodeLocation)
sl.miner.worker.pendingHeaderFeed.Send(sl.phCache[sl.pendingHeaderHeadHash].Header)
bestPh, exists := sl.phCache[sl.bestPhKey]
if exists {
bestPh.Header.SetLocation(common.NodeLocation)
sl.miner.worker.pendingHeaderFeed.Send(bestPh.Header)
return
}
} else if !domOrigin {
Expand Down Expand Up @@ -405,12 +401,10 @@ func (sl *Slice) setHeaderChainHead(batch ethdb.Batch, block *types.Block, reorg
}

// PCRC previous coincidence reference check makes sure there are not any cyclic references in the graph and calculates new termini and the block terminus
func (sl *Slice) pcrc(batch ethdb.Batch, header *types.Header, domTerminus common.Hash) (common.Hash, []common.Hash, error) {
func (sl *Slice) pcrc(batch ethdb.Batch, header *types.Header, domTerminus common.Hash, domOrigin bool) (common.Hash, []common.Hash, error) {
nodeCtx := common.NodeLocation.Context()
location := header.Location()

isDomCoincident := sl.engine.IsDomCoincident(header)

log.Debug("PCRC:", "Parent Hash:", header.ParentHash(), "Number", header.Number, "Location:", header.Location())
termini := sl.hc.GetTerminiByHash(header.ParentHash())

Expand All @@ -434,15 +428,15 @@ func (sl *Slice) pcrc(batch ethdb.Batch, header *types.Header, domTerminus commo
}

// Set the terminus
if nodeCtx == common.PRIME_CTX || isDomCoincident {
newTermini[terminiIndex] = header.Hash()
if nodeCtx == common.PRIME_CTX || domOrigin {
newTermini[TerminusIndex] = header.Hash()
} else {
newTermini[terminiIndex] = termini[terminiIndex]
newTermini[TerminusIndex] = termini[TerminusIndex]
}

// Check for a graph cyclic reference
if isDomCoincident {
if termini[terminiIndex] != domTerminus {
if domOrigin {
if termini[TerminusIndex] != domTerminus {
log.Warn("Cyclic Block:", "block number", header.NumberArray(), "hash", header.Hash(), "terminus", domTerminus, "termini", termini)
return common.Hash{}, []common.Hash{}, errors.New("termini do not match, block rejected due to cyclic reference")
}
Expand All @@ -469,23 +463,26 @@ func (sl *Slice) hlcr(externTd *big.Int) bool {
}

// CalcTd calculates the TD of the given header using PCRC.
func (sl *Slice) calcTd(header *types.Header) (*big.Int, error) {
// Stop from
isDomCoincident := sl.engine.IsDomCoincident(header)
if isDomCoincident {
return nil, errors.New("td on a dom block cannot be calculated by a sub")
}
func (sl *Slice) calcTd(header *types.Header, domTd *big.Int, domOrigin bool) (*big.Int, error) {
priorTd := sl.hc.GetTd(header.ParentHash(), header.NumberU64()-1)
if priorTd == nil {
return nil, consensus.ErrFutureBlock
}

Td := priorTd.Add(priorTd, header.Difficulty())

if domOrigin {
// If its a dom block we don't compute the td, instead just return the
// td given by dom
return domTd, nil
}

return Td, nil
}

// GetPendingHeader is used by the miner to request the current pending header
func (sl *Slice) GetPendingHeader() (*types.Header, error) {
if ph := sl.phCache[sl.pendingHeaderHeadHash].Header; ph != nil {
if ph := sl.phCache[sl.bestPhKey].Header; ph != nil {
return ph, nil
} else {
return nil, errors.New("empty pending header")
Expand Down Expand Up @@ -554,8 +551,9 @@ func (sl *Slice) SubRelayPendingHeader(pendingHeader types.PendingHeader, reorg
if err != nil {
return
}
bestPh, exists := sl.phCache[sl.pendingHeaderHeadHash]
bestPh, exists := sl.phCache[sl.bestPhKey]
if exists {
bestPh.Header.SetLocation(common.NodeLocation)
sl.miner.worker.pendingHeaderFeed.Send(bestPh.Header)
}
}
Expand All @@ -567,7 +565,7 @@ func (sl *Slice) computePendingHeader(localPendingHeaderWithTermini types.Pendin
nodeCtx := common.NodeLocation.Context()

var cachedPendingHeaderWithTermini types.PendingHeader
hash := localPendingHeaderWithTermini.Termini[terminiIndex]
hash := localPendingHeaderWithTermini.Termini[TerminusIndex]
cachedPendingHeaderWithTermini, exists := sl.phCache[hash]
var newPh *types.Header

Expand All @@ -586,69 +584,38 @@ func (sl *Slice) computePendingHeader(localPendingHeaderWithTermini types.Pendin
// updatePhCacheFromDom combines the recieved pending header with the pending header stored locally at a given terminus for specified context
func (sl *Slice) updatePhCacheFromDom(pendingHeader types.PendingHeader, terminiIndex int, indices []int, reorg bool) error {

var localPendingHeader types.PendingHeader
hash := pendingHeader.Termini[terminiIndex]
localPendingHeader, exists := sl.phCache[hash]

if exists {
combinedPendingHeader := types.CopyHeader(localPendingHeader.Header)
for _, i := range indices {
localPendingHeader.Header = sl.combinePendingHeader(pendingHeader.Header, localPendingHeader.Header, i)
combinedPendingHeader = sl.combinePendingHeader(pendingHeader.Header, combinedPendingHeader, i)
}
localPendingHeader.Header.SetLocation(common.NodeLocation)
sl.phCache[hash] = localPendingHeader
combinedPendingHeader.SetLocation(common.NodeLocation)

sl.writeToPhCacheAndPickPhHead(reorg, types.PendingHeader{Header: combinedPendingHeader, Termini: localPendingHeader.Termini}, false, 3)

if reorg {
sl.pendingHeaderHeadHash = hash
}
return nil
}
log.Warn("no pending header found for", "terminus", hash)
return errors.New("no pending header found in cache")
}

// writePhCache dom writes a given pendingHeaderWithTermini to the cache with the terminus used as the key.
func (sl *Slice) writeToPhCache(pendingHeaderWithTermini types.PendingHeader) {
sl.phCache[pendingHeaderWithTermini.Termini[terminiIndex]] = pendingHeaderWithTermini
}

// pickPhCacheHead determines if the provided pendingHeader should be selected and returns true if selected
func (sl *Slice) pickPhCacheHead(reorg bool, externPendingHeaderWithTermini types.PendingHeader, domOrigin bool) bool {
func (sl *Slice) writeToPhCacheAndPickPhHead(reorg bool, pendingHeaderWithTermini types.PendingHeader, local bool, terminiIndex int) {
deepCopyPendingHeaderWithTermini := types.PendingHeader{Header: types.CopyHeader(pendingHeaderWithTermini.Header), Termini: pendingHeaderWithTermini.Termini}
//Only write iff our context is better than current ie > td
if reorg {
sl.pendingHeaderHeadHash = externPendingHeaderWithTermini.Termini[terminiIndex]
return true
sl.phCache[pendingHeaderWithTermini.Termini[terminiIndex]] = deepCopyPendingHeaderWithTermini
sl.bestPhKey = pendingHeaderWithTermini.Termini[terminiIndex]
}

localPendingHeader, exists := sl.phCache[sl.pendingHeaderHeadHash]

if domOrigin {
//calc local cache head reorg
localCacheReorg := true
for i := 0; i < common.NodeLocation.Context(); i++ {
localCacheReorg = (externPendingHeaderWithTermini.Header.NumberArray()[i].Cmp(localPendingHeader.Header.NumberArray()[i]) >= 0) && localCacheReorg
}

if exists && localCacheReorg && (externPendingHeaderWithTermini.Header.NumberU64() > localPendingHeader.Header.NumberU64()) {
return sl.updateCurrentPendingHeader(externPendingHeaderWithTermini)
}
_, exist := sl.phCache[pendingHeaderWithTermini.Termini[terminiIndex]]
if !exist {
sl.phCache[pendingHeaderWithTermini.Termini[terminiIndex]] = deepCopyPendingHeaderWithTermini
}

return false
}

// updateCurrentPendingHeader compares the externPh parent td to the sl.pendingHeader parent td and sets sl.pendingHeader to the exterPh if the td is greater
func (sl *Slice) updateCurrentPendingHeader(externPendingHeader types.PendingHeader) bool {
externTd := sl.hc.GetTdByHash(externPendingHeader.Header.ParentHash())
currentTd := sl.hc.GetTdByHash(sl.phCache[sl.pendingHeaderHeadHash].Header.ParentHash())
log.Debug("updateCurrentPendingHeader:", "currentParent:", sl.phCache[sl.pendingHeaderHeadHash].Header.ParentHash(), "currentTd:", currentTd, "externParent:", externPendingHeader.Header.ParentHash(), "externTd:", externTd)
if currentTd != nil && externTd != nil {
if currentTd.Cmp(externTd) < 0 {
sl.pendingHeaderHeadHash = externPendingHeader.Termini[terminiIndex]
}
} else {
log.Warn("updateCurrentPendingHeader:", "currentParent:", sl.phCache[sl.pendingHeaderHeadHash].Header.ParentHash(), "currentTd:", currentTd, "externParent:", externPendingHeader.Header.ParentHash(), "externTd:", externTd)
return false
}
return true
}

// init checks if the headerchain is empty and if it's empty appends the Knot
Expand All @@ -672,7 +639,6 @@ func (sl *Slice) init(genesis *Genesis) error {
if sl.hc.Empty() {
// Initialize slice state for genesis knot
genesisTermini := []common.Hash{genesisHash, genesisHash, genesisHash, genesisHash}
sl.pendingHeaderHeadHash = genesisHash
rawdb.WriteTermini(sl.sliceDb, genesisHash, genesisTermini)

// Append each of the knot blocks
Expand All @@ -683,14 +649,17 @@ func (sl *Slice) init(genesis *Genesis) error {
if block != nil {
location := block.Header().Location()
if nodeCtx == common.PRIME_CTX {
sl.bestPhKey = block.Hash()
rawdb.WriteCandidateBody(sl.sliceDb, block.Hash(), block.Body())
_, err := sl.Append(block.Header(), types.EmptyHeader(), genesisHash, block.Difficulty(), false, false, nil)
if err != nil {
log.Warn("Failed to append block", "hash:", block.Hash(), "Number:", block.Number(), "Location:", block.Header().Location(), "error:", err)
}
} else if location.Region() == common.NodeLocation.Region() && len(common.NodeLocation) == common.REGION_CTX {
sl.bestPhKey = block.Hash()
rawdb.WriteCandidateBody(sl.sliceDb, block.Hash(), block.Body())
} else if bytes.Equal(location, common.NodeLocation) {
sl.bestPhKey = block.Hash()
rawdb.WriteCandidateBody(sl.sliceDb, block.Hash(), block.Body())
}
}
Expand Down Expand Up @@ -874,14 +843,14 @@ func (sl *Slice) updatePendingHeadersCache() {
// loadLastState loads the phCache and the slice pending header hash from the db.
func (sl *Slice) loadLastState() error {
sl.phCache = rawdb.ReadPhCache(sl.sliceDb)
sl.pendingHeaderHeadHash = rawdb.ReadCurrentPendingHeaderHash(sl.sliceDb)
sl.bestPhKey = rawdb.ReadCurrentPendingHeaderHash(sl.sliceDb)
return nil
}

// Stop stores the phCache and the sl.pendingHeader hash value to the db.
func (sl *Slice) Stop() {
// write the ph head hash to the db.
rawdb.WriteCurrentPendingHeaderHash(sl.sliceDb, sl.pendingHeaderHeadHash)
rawdb.WriteCurrentPendingHeaderHash(sl.sliceDb, sl.bestPhKey)
// Write the ph cache to the dd.
rawdb.WritePhCache(sl.sliceDb, sl.phCache)

Expand Down

0 comments on commit 8e832c9

Please sign in to comment.