Skip to content

Commit

Permalink
GetPendingEtxs/Rollup from Sub if not found even after sub sending it…
Browse files Browse the repository at this point in the history
… ahead of time
  • Loading branch information
gameofpointers committed Sep 23, 2023
1 parent 209c0b2 commit 9be7a33
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 15 deletions.
8 changes: 8 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,14 @@ func (c *Core) GetPendingEtxsRollup(hash common.Hash) *types.PendingEtxsRollup {
return rawdb.ReadPendingEtxsRollup(c.sl.sliceDb, hash)
}

func (c *Core) GetPendingEtxsRollupFromSub(hash common.Hash, location common.Location) (types.PendingEtxsRollup, error) {
return c.sl.GetPendingEtxsRollupFromSub(hash, location)
}

func (c *Core) GetPendingEtxsFromSub(hash common.Hash, location common.Location) (types.PendingEtxs, error) {
return c.sl.GetPendingEtxsFromSub(hash, location)
}

func (c *Core) HasPendingEtxs(hash common.Hash) bool {
return c.GetPendingEtxs(hash) != nil
}
Expand Down
50 changes: 37 additions & 13 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ const (
primeHorizonThreshold = 20
)

// getPendingEtxsRollup gets the pendingEtxsRollup rollup form appropriate Region
type getPendingEtxsRollup func(hash common.Hash, location common.Location) (types.PendingEtxsRollup, error)

// getPendingEtxs gets the pendingEtxs from the appropriate Zone
type getPendingEtxs func(hash common.Hash, location common.Location) (types.PendingEtxs, error)

type HeaderChain struct {
config *params.ChainConfig

Expand All @@ -50,6 +56,9 @@ type HeaderChain struct {
headerCache *lru.Cache // Cache for the most recent block headers
numberCache *lru.Cache // Cache for the most recent block numbers

fetchPEtxRollup getPendingEtxsRollup
fetchPEtx getPendingEtxs

pendingEtxsRollup *lru.Cache
pendingEtxs *lru.Cache
blooms *lru.Cache
Expand All @@ -65,17 +74,19 @@ type HeaderChain struct {

// NewHeaderChain creates a new HeaderChain structure. ProcInterrupt points
// to the parent's interrupt semaphore.
func NewHeaderChain(db ethdb.Database, engine consensus.Engine, chainConfig *params.ChainConfig, cacheConfig *CacheConfig, txLookupLimit *uint64, vmConfig vm.Config, slicesRunning []common.Location) (*HeaderChain, error) {
func NewHeaderChain(db ethdb.Database, engine consensus.Engine, pEtxsRollupFetcher getPendingEtxsRollup, pEtxsFetcher getPendingEtxs, chainConfig *params.ChainConfig, cacheConfig *CacheConfig, txLookupLimit *uint64, vmConfig vm.Config, slicesRunning []common.Location) (*HeaderChain, error) {
headerCache, _ := lru.New(headerCacheLimit)
numberCache, _ := lru.New(numberCacheLimit)

hc := &HeaderChain{
config: chainConfig,
headerDb: db,
headerCache: headerCache,
numberCache: numberCache,
engine: engine,
slicesRunning: slicesRunning,
config: chainConfig,
headerDb: db,
headerCache: headerCache,
numberCache: numberCache,
engine: engine,
slicesRunning: slicesRunning,
fetchPEtxRollup: pEtxsRollupFetcher,
fetchPEtx: pEtxsFetcher,
}

pendingEtxsRollup, _ := lru.New(c_maxPendingEtxsRollup)
Expand Down Expand Up @@ -130,21 +141,34 @@ func (hc *HeaderChain) CollectSubRollup(b *types.Block) (types.Transactions, err
for _, pEtxHash := range pEtxRollup.Manifest {
pendingEtxs, err := hc.GetPendingEtxs(pEtxHash)
if err != nil {
// Start backfilling the missing pending ETXs needed to process this block
return nil, ErrPendingEtxNotFound
// Get the pendingEtx from the appropriate zone
pEtx, err := hc.fetchPEtx(pEtxHash, pEtxRollup.Header.Location())
if err != nil {
return nil, ErrPendingEtxNotFound
} else {
hc.AddPendingEtxs(pEtx)
pendingEtxs = &pEtx
}
}
subRollup = append(subRollup, pendingEtxs.Etxs...)
}
} else {
// Start backfilling the missing pending ETXs needed to process this block
// Try to get the pending etx from the Regions
hc.fetchPEtxRollup(hash, b.Location())
return nil, ErrPendingEtxNotFound
}
// Region works normally as before collecting pendingEtxs for each hash in the manifest
} else if nodeCtx == common.REGION_CTX {
pendingEtxs, err := hc.GetPendingEtxs(hash)
if err != nil {
// Start backfilling the missing pending ETXs needed to process this block
return nil, ErrPendingEtxNotFound
// Get the pendingEtx from the appropriate zone
pEtx, err := hc.fetchPEtx(hash, b.Header().Location())
if err != nil {
return nil, ErrPendingEtxNotFound
} else {
hc.AddPendingEtxs(pEtx)
pendingEtxs = &pEtx
}
}
subRollup = append(subRollup, pendingEtxs.Etxs...)
}
Expand Down Expand Up @@ -183,7 +207,7 @@ func (hc *HeaderChain) GetPendingEtxsRollup(hash common.Hash) (*types.PendingEtx
rollups = *res
} else {
log.Trace("unable to find pending etxs rollups for hash in manifest", "hash:", hash.String())
return nil, ErrPendingEtxNotFound
return nil, ErrPendingEtxRollupNotFound
}
return &rollups, nil
}
Expand Down
40 changes: 39 additions & 1 deletion core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLooku
}

var err error
sl.hc, err = NewHeaderChain(db, engine, chainConfig, cacheConfig, txLookupLimit, vmConfig, slicesRunning)
sl.hc, err = NewHeaderChain(db, engine, sl.GetPendingEtxsRollupFromSub, sl.GetPendingEtxsFromSub, chainConfig, cacheConfig, txLookupLimit, vmConfig, slicesRunning)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -676,6 +676,44 @@ func (sl *Slice) SendPendingEtxsToDom(pEtxs types.PendingEtxs) error {
return sl.domClient.SendPendingEtxsToDom(context.Background(), pEtxs)
}

// GetPendingEtxsRollupFromSub gets the pending etxs rollup from the appropriate prime
func (sl *Slice) GetPendingEtxsRollupFromSub(hash common.Hash, location common.Location) (types.PendingEtxsRollup, error) {
// Only allowed to be called in Region
nodeCtx := common.NodeLocation.Context()
if nodeCtx == common.PRIME_CTX {
if sl.subClients[location.SubIndex()] != nil {
pEtxRollup, err := sl.subClients[location.SubIndex()].GetPendingEtxsRollupFromSub(context.Background(), hash, location)
if err != nil {
return types.PendingEtxsRollup{}, err
} else {
sl.AddPendingEtxsRollup(pEtxRollup)
}
}
} else if nodeCtx == common.REGION_CTX {
block := sl.hc.GetBlockOrCandidateByHash(hash)
if block != nil {
return types.PendingEtxsRollup{Header: block.Header(), Manifest: block.SubManifest()}, nil
}
}
return types.PendingEtxsRollup{}, ErrPendingEtxNotFound
}

// GetPendingEtxsFromSub gets the pending etxs from the appropriate prime
func (sl *Slice) GetPendingEtxsFromSub(hash common.Hash, location common.Location) (types.PendingEtxs, error) {
// Only allowed to be called in Region
nodeCtx := common.NodeLocation.Context()
if nodeCtx != common.ZONE_CTX {
if sl.subClients[location.SubIndex()] != nil {
return sl.subClients[location.SubIndex()].GetPendingEtxsFromSub(context.Background(), hash, location)
}
}
block := sl.hc.GetBlockOrCandidateByHash(hash)
if block != nil {
return types.PendingEtxs{Header: block.Header(), Etxs: block.ExtTransactions()}, nil
}
return types.PendingEtxs{}, ErrPendingEtxNotFound
}

// SubRelayPendingHeader takes a pending header from the sender (ie dominant), updates the phCache with a composited header and relays result to subordinates
func (sl *Slice) SubRelayPendingHeader(pendingHeader types.PendingHeader, newEntropy *big.Int, location common.Location, subReorg bool, order int) {
nodeCtx := common.NodeLocation.Context()
Expand Down
8 changes: 8 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,3 +515,11 @@ func (b *QuaiAPIBackend) SubscribePendingHeaderEvent(ch chan<- *types.Header) ev
func (b *QuaiAPIBackend) GenerateRecoveryPendingHeader(pendingHeader *types.Header, checkpointHashes types.Termini) error {
return b.eth.core.GenerateRecoveryPendingHeader(pendingHeader, checkpointHashes)
}

func (b *QuaiAPIBackend) GetPendingEtxsRollupFromSub(hash common.Hash, location common.Location) (types.PendingEtxsRollup, error) {
return b.eth.core.GetPendingEtxsRollupFromSub(hash, location)
}

func (b *QuaiAPIBackend) GetPendingEtxsFromSub(hash common.Hash, location common.Location) (types.PendingEtxs, error) {
return b.eth.core.GetPendingEtxsFromSub(hash, location)
}
4 changes: 3 additions & 1 deletion eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block) er
return nil
}
// Schedule the block for import
h.blockFetcher.Enqueue(peer.ID(), block)
if err := h.blockFetcher.Enqueue(peer.ID(), block); err != nil {
return err
}

if block != nil && !h.broadcastCache.Contains(block.Hash()) {
log.Info("Received Block Broadcast", "Hash", block.Hash(), "Number", block.Header().NumberArray())
Expand Down
2 changes: 2 additions & 0 deletions internal/quaiapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type Backend interface {
AddPendingEtxsRollup(pEtxsRollup types.PendingEtxsRollup) error
PendingBlockAndReceipts() (*types.Block, types.Receipts)
GenerateRecoveryPendingHeader(pendingHeader *types.Header, checkpointHashes types.Termini) error
GetPendingEtxsRollupFromSub(hash common.Hash, location common.Location) (types.PendingEtxsRollup, error)
GetPendingEtxsFromSub(hash common.Hash, location common.Location) (types.PendingEtxs, error)
ProcessingState() bool

// Transaction pool API
Expand Down
42 changes: 42 additions & 0 deletions internal/quaiapi/quai_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,3 +771,45 @@ func (s *PublicBlockChainQuaiAPI) GenerateRecoveryPendingHeader(ctx context.Cont
}
return s.b.GenerateRecoveryPendingHeader(pHandcheckPointHashes.PendingHeader, pHandcheckPointHashes.CheckpointHashes)
}

type GetPendingEtxsRollupFuncArgs struct {
Hash common.Hash
Location common.Location
}

func (s *PublicBlockChainQuaiAPI) GetPendingEtxsRollupFromSub(ctx context.Context, raw json.RawMessage) (map[string]interface{}, error) {
var getPEtxsRollup GetPendingEtxsFuncArgs
if err := json.Unmarshal(raw, &getPEtxsRollup); err != nil {
return nil, err
}
pEtxsRollup, err := s.b.GetPendingEtxsRollupFromSub(getPEtxsRollup.Hash, getPEtxsRollup.Location)
if err != nil {
return nil, err
}
fields := make(map[string]interface{})
fields["header"] = pEtxsRollup.Header.RPCMarshalHeader()
fields["manifest"] = pEtxsRollup.Manifest

return fields, nil
}

type GetPendingEtxsFuncArgs struct {
Hash common.Hash
Location common.Location
}

func (s *PublicBlockChainQuaiAPI) GetPendingEtxsFromSub(ctx context.Context, raw json.RawMessage) (map[string]interface{}, error) {
var getPEtxs GetPendingEtxsFuncArgs
if err := json.Unmarshal(raw, &getPEtxs); err != nil {
return nil, err
}
pEtxs, err := s.b.GetPendingEtxsFromSub(getPEtxs.Hash, getPEtxs.Location)
if err != nil {
return nil, err
}
fields := make(map[string]interface{})
fields["header"] = pEtxs.Header.RPCMarshalHeader()
fields["etxs"] = pEtxs.Etxs

return fields, nil
}
38 changes: 38 additions & 0 deletions quaiclient/quaiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,44 @@ func (ec *Client) GetManifest(ctx context.Context, blockHash common.Hash) (types
return manifest, nil
}

// GetPendingEtxsRollupFromSub gets the pendingEtxsRollup from the region
func (ec *Client) GetPendingEtxsRollupFromSub(ctx context.Context, hash common.Hash, location common.Location) (types.PendingEtxsRollup, error) {
fields := make(map[string]interface{})
fields["Hash"] = hash
fields["Location"] = location

var raw json.RawMessage
err := ec.c.CallContext(ctx, &raw, "quai_getPendingEtxsRollupFromSub", fields)
if err != nil {
return types.PendingEtxsRollup{}, err
}

var pEtxsRollup types.PendingEtxsRollup
if err := json.Unmarshal(raw, &pEtxsRollup); err != nil {
return types.PendingEtxsRollup{}, err
}
return pEtxsRollup, nil
}

// GetPendingEtxsFromSub gets the pendingEtxsRollup from the region
func (ec *Client) GetPendingEtxsFromSub(ctx context.Context, hash common.Hash, location common.Location) (types.PendingEtxs, error) {
fields := make(map[string]interface{})
fields["Hash"] = hash
fields["Location"] = location

var raw json.RawMessage
err := ec.c.CallContext(ctx, &raw, "quai_getPendingEtxsFromSub", fields)
if err != nil {
return types.PendingEtxs{}, err
}

var pEtxs types.PendingEtxs
if err := json.Unmarshal(raw, &pEtxs); err != nil {
return types.PendingEtxs{}, err
}
return pEtxs, nil
}

func (ec *Client) SendPendingEtxsToDom(ctx context.Context, pEtxs types.PendingEtxs) error {
fields := make(map[string]interface{})
fields["header"] = pEtxs.Header.RPCMarshalHeader()
Expand Down

0 comments on commit 9be7a33

Please sign in to comment.