diff --git a/core/core.go b/core/core.go index 7e13c9e40d..ba16da0156 100644 --- a/core/core.go +++ b/core/core.go @@ -479,6 +479,14 @@ func (c *Core) GetPendingEtxsRollup(hash common.Hash) *types.PendingEtxsRollup { return rawdb.ReadPendingEtxsRollup(c.sl.sliceDb, hash) } +func (c *Core) GetPendingEtxsRollupFromSub(hash common.Hash, location common.Location) (types.PendingEtxsRollup, error) { + return c.sl.GetPendingEtxsRollupFromSub(hash, location) +} + +func (c *Core) GetPendingEtxsFromSub(hash common.Hash, location common.Location) (types.PendingEtxs, error) { + return c.sl.GetPendingEtxsFromSub(hash, location) +} + func (c *Core) HasPendingEtxs(hash common.Hash) bool { return c.GetPendingEtxs(hash) != nil } diff --git a/core/headerchain.go b/core/headerchain.go index 1c75f0fbcc..78a8f4de96 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -31,6 +31,12 @@ const ( primeHorizonThreshold = 20 ) +// getPendingEtxsRollup gets the pendingEtxsRollup rollup form appropriate Region +type getPendingEtxsRollup func(hash common.Hash, location common.Location) (types.PendingEtxsRollup, error) + +// getPendingEtxs gets the pendingEtxs from the appropriate Zone +type getPendingEtxs func(hash common.Hash, location common.Location) (types.PendingEtxs, error) + type HeaderChain struct { config *params.ChainConfig @@ -50,6 +56,9 @@ type HeaderChain struct { headerCache *lru.Cache // Cache for the most recent block headers numberCache *lru.Cache // Cache for the most recent block numbers + fetchPEtxRollup getPendingEtxsRollup + fetchPEtx getPendingEtxs + pendingEtxsRollup *lru.Cache pendingEtxs *lru.Cache blooms *lru.Cache @@ -65,17 +74,19 @@ type HeaderChain struct { // NewHeaderChain creates a new HeaderChain structure. ProcInterrupt points // to the parent's interrupt semaphore. -func NewHeaderChain(db ethdb.Database, engine consensus.Engine, chainConfig *params.ChainConfig, cacheConfig *CacheConfig, txLookupLimit *uint64, vmConfig vm.Config, slicesRunning []common.Location) (*HeaderChain, error) { +func NewHeaderChain(db ethdb.Database, engine consensus.Engine, pEtxsRollupFetcher getPendingEtxsRollup, pEtxsFetcher getPendingEtxs, chainConfig *params.ChainConfig, cacheConfig *CacheConfig, txLookupLimit *uint64, vmConfig vm.Config, slicesRunning []common.Location) (*HeaderChain, error) { headerCache, _ := lru.New(headerCacheLimit) numberCache, _ := lru.New(numberCacheLimit) hc := &HeaderChain{ - config: chainConfig, - headerDb: db, - headerCache: headerCache, - numberCache: numberCache, - engine: engine, - slicesRunning: slicesRunning, + config: chainConfig, + headerDb: db, + headerCache: headerCache, + numberCache: numberCache, + engine: engine, + slicesRunning: slicesRunning, + fetchPEtxRollup: pEtxsRollupFetcher, + fetchPEtx: pEtxsFetcher, } pendingEtxsRollup, _ := lru.New(c_maxPendingEtxsRollup) @@ -130,21 +141,34 @@ func (hc *HeaderChain) CollectSubRollup(b *types.Block) (types.Transactions, err for _, pEtxHash := range pEtxRollup.Manifest { pendingEtxs, err := hc.GetPendingEtxs(pEtxHash) if err != nil { - // Start backfilling the missing pending ETXs needed to process this block - return nil, ErrPendingEtxNotFound + // Get the pendingEtx from the appropriate zone + pEtx, err := hc.fetchPEtx(pEtxHash, pEtxRollup.Header.Location()) + if err != nil { + return nil, ErrPendingEtxNotFound + } else { + hc.AddPendingEtxs(pEtx) + pendingEtxs = &pEtx + } } subRollup = append(subRollup, pendingEtxs.Etxs...) } } else { - // Start backfilling the missing pending ETXs needed to process this block + // Try to get the pending etx from the Regions + hc.fetchPEtxRollup(hash, b.Location()) return nil, ErrPendingEtxNotFound } // Region works normally as before collecting pendingEtxs for each hash in the manifest } else if nodeCtx == common.REGION_CTX { pendingEtxs, err := hc.GetPendingEtxs(hash) if err != nil { - // Start backfilling the missing pending ETXs needed to process this block - return nil, ErrPendingEtxNotFound + // Get the pendingEtx from the appropriate zone + pEtx, err := hc.fetchPEtx(hash, b.Header().Location()) + if err != nil { + return nil, ErrPendingEtxNotFound + } else { + hc.AddPendingEtxs(pEtx) + pendingEtxs = &pEtx + } } subRollup = append(subRollup, pendingEtxs.Etxs...) } @@ -183,7 +207,7 @@ func (hc *HeaderChain) GetPendingEtxsRollup(hash common.Hash) (*types.PendingEtx rollups = *res } else { log.Trace("unable to find pending etxs rollups for hash in manifest", "hash:", hash.String()) - return nil, ErrPendingEtxNotFound + return nil, ErrPendingEtxRollupNotFound } return &rollups, nil } diff --git a/core/slice.go b/core/slice.go index abc0a7c1ea..35ad276f73 100644 --- a/core/slice.go +++ b/core/slice.go @@ -84,7 +84,7 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLooku } var err error - sl.hc, err = NewHeaderChain(db, engine, chainConfig, cacheConfig, txLookupLimit, vmConfig, slicesRunning) + sl.hc, err = NewHeaderChain(db, engine, sl.GetPendingEtxsRollupFromSub, sl.GetPendingEtxsFromSub, chainConfig, cacheConfig, txLookupLimit, vmConfig, slicesRunning) if err != nil { return nil, err } @@ -676,6 +676,44 @@ func (sl *Slice) SendPendingEtxsToDom(pEtxs types.PendingEtxs) error { return sl.domClient.SendPendingEtxsToDom(context.Background(), pEtxs) } +// GetPendingEtxsRollupFromSub gets the pending etxs rollup from the appropriate prime +func (sl *Slice) GetPendingEtxsRollupFromSub(hash common.Hash, location common.Location) (types.PendingEtxsRollup, error) { + // Only allowed to be called in Region + nodeCtx := common.NodeLocation.Context() + if nodeCtx == common.PRIME_CTX { + if sl.subClients[location.SubIndex()] != nil { + pEtxRollup, err := sl.subClients[location.SubIndex()].GetPendingEtxsRollupFromSub(context.Background(), hash, location) + if err != nil { + return types.PendingEtxsRollup{}, err + } else { + sl.AddPendingEtxsRollup(pEtxRollup) + } + } + } else if nodeCtx == common.REGION_CTX { + block := sl.hc.GetBlockOrCandidateByHash(hash) + if block != nil { + return types.PendingEtxsRollup{Header: block.Header(), Manifest: block.SubManifest()}, nil + } + } + return types.PendingEtxsRollup{}, ErrPendingEtxNotFound +} + +// GetPendingEtxsFromSub gets the pending etxs from the appropriate prime +func (sl *Slice) GetPendingEtxsFromSub(hash common.Hash, location common.Location) (types.PendingEtxs, error) { + // Only allowed to be called in Region + nodeCtx := common.NodeLocation.Context() + if nodeCtx != common.ZONE_CTX { + if sl.subClients[location.SubIndex()] != nil { + return sl.subClients[location.SubIndex()].GetPendingEtxsFromSub(context.Background(), hash, location) + } + } + block := sl.hc.GetBlockOrCandidateByHash(hash) + if block != nil { + return types.PendingEtxs{Header: block.Header(), Etxs: block.ExtTransactions()}, nil + } + return types.PendingEtxs{}, ErrPendingEtxNotFound +} + // SubRelayPendingHeader takes a pending header from the sender (ie dominant), updates the phCache with a composited header and relays result to subordinates func (sl *Slice) SubRelayPendingHeader(pendingHeader types.PendingHeader, newEntropy *big.Int, location common.Location, subReorg bool, order int) { nodeCtx := common.NodeLocation.Context() diff --git a/eth/api_backend.go b/eth/api_backend.go index 10710c1e85..ca872babc7 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -515,3 +515,11 @@ func (b *QuaiAPIBackend) SubscribePendingHeaderEvent(ch chan<- *types.Header) ev func (b *QuaiAPIBackend) GenerateRecoveryPendingHeader(pendingHeader *types.Header, checkpointHashes types.Termini) error { return b.eth.core.GenerateRecoveryPendingHeader(pendingHeader, checkpointHashes) } + +func (b *QuaiAPIBackend) GetPendingEtxsRollupFromSub(hash common.Hash, location common.Location) (types.PendingEtxsRollup, error) { + return b.eth.core.GetPendingEtxsRollupFromSub(hash, location) +} + +func (b *QuaiAPIBackend) GetPendingEtxsFromSub(hash common.Hash, location common.Location) (types.PendingEtxs, error) { + return b.eth.core.GetPendingEtxsFromSub(hash, location) +} diff --git a/eth/handler_eth.go b/eth/handler_eth.go index eec5beb8f3..098d81ed7b 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -187,7 +187,9 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block) er return nil } // Schedule the block for import - h.blockFetcher.Enqueue(peer.ID(), block) + if err := h.blockFetcher.Enqueue(peer.ID(), block); err != nil { + return err + } if block != nil && !h.broadcastCache.Contains(block.Hash()) { log.Info("Received Block Broadcast", "Hash", block.Hash(), "Number", block.Header().NumberArray()) diff --git a/internal/quaiapi/backend.go b/internal/quaiapi/backend.go index 9c8ea418dd..d82c88f9dd 100644 --- a/internal/quaiapi/backend.go +++ b/internal/quaiapi/backend.go @@ -87,6 +87,8 @@ type Backend interface { AddPendingEtxsRollup(pEtxsRollup types.PendingEtxsRollup) error PendingBlockAndReceipts() (*types.Block, types.Receipts) GenerateRecoveryPendingHeader(pendingHeader *types.Header, checkpointHashes types.Termini) error + GetPendingEtxsRollupFromSub(hash common.Hash, location common.Location) (types.PendingEtxsRollup, error) + GetPendingEtxsFromSub(hash common.Hash, location common.Location) (types.PendingEtxs, error) ProcessingState() bool // Transaction pool API diff --git a/internal/quaiapi/quai_api.go b/internal/quaiapi/quai_api.go index 90b0626795..7b7446cfb9 100644 --- a/internal/quaiapi/quai_api.go +++ b/internal/quaiapi/quai_api.go @@ -771,3 +771,45 @@ func (s *PublicBlockChainQuaiAPI) GenerateRecoveryPendingHeader(ctx context.Cont } return s.b.GenerateRecoveryPendingHeader(pHandcheckPointHashes.PendingHeader, pHandcheckPointHashes.CheckpointHashes) } + +type GetPendingEtxsRollupFuncArgs struct { + Hash common.Hash + Location common.Location +} + +func (s *PublicBlockChainQuaiAPI) GetPendingEtxsRollupFromSub(ctx context.Context, raw json.RawMessage) (map[string]interface{}, error) { + var getPEtxsRollup GetPendingEtxsFuncArgs + if err := json.Unmarshal(raw, &getPEtxsRollup); err != nil { + return nil, err + } + pEtxsRollup, err := s.b.GetPendingEtxsRollupFromSub(getPEtxsRollup.Hash, getPEtxsRollup.Location) + if err != nil { + return nil, err + } + fields := make(map[string]interface{}) + fields["header"] = pEtxsRollup.Header.RPCMarshalHeader() + fields["manifest"] = pEtxsRollup.Manifest + + return fields, nil +} + +type GetPendingEtxsFuncArgs struct { + Hash common.Hash + Location common.Location +} + +func (s *PublicBlockChainQuaiAPI) GetPendingEtxsFromSub(ctx context.Context, raw json.RawMessage) (map[string]interface{}, error) { + var getPEtxs GetPendingEtxsFuncArgs + if err := json.Unmarshal(raw, &getPEtxs); err != nil { + return nil, err + } + pEtxs, err := s.b.GetPendingEtxsFromSub(getPEtxs.Hash, getPEtxs.Location) + if err != nil { + return nil, err + } + fields := make(map[string]interface{}) + fields["header"] = pEtxs.Header.RPCMarshalHeader() + fields["etxs"] = pEtxs.Etxs + + return fields, nil +} diff --git a/quaiclient/quaiclient.go b/quaiclient/quaiclient.go index ff5d13d509..70cccb2ba8 100644 --- a/quaiclient/quaiclient.go +++ b/quaiclient/quaiclient.go @@ -157,6 +157,44 @@ func (ec *Client) GetManifest(ctx context.Context, blockHash common.Hash) (types return manifest, nil } +// GetPendingEtxsRollupFromSub gets the pendingEtxsRollup from the region +func (ec *Client) GetPendingEtxsRollupFromSub(ctx context.Context, hash common.Hash, location common.Location) (types.PendingEtxsRollup, error) { + fields := make(map[string]interface{}) + fields["Hash"] = hash + fields["Location"] = location + + var raw json.RawMessage + err := ec.c.CallContext(ctx, &raw, "quai_getPendingEtxsRollupFromSub", fields) + if err != nil { + return types.PendingEtxsRollup{}, err + } + + var pEtxsRollup types.PendingEtxsRollup + if err := json.Unmarshal(raw, &pEtxsRollup); err != nil { + return types.PendingEtxsRollup{}, err + } + return pEtxsRollup, nil +} + +// GetPendingEtxsFromSub gets the pendingEtxsRollup from the region +func (ec *Client) GetPendingEtxsFromSub(ctx context.Context, hash common.Hash, location common.Location) (types.PendingEtxs, error) { + fields := make(map[string]interface{}) + fields["Hash"] = hash + fields["Location"] = location + + var raw json.RawMessage + err := ec.c.CallContext(ctx, &raw, "quai_getPendingEtxsFromSub", fields) + if err != nil { + return types.PendingEtxs{}, err + } + + var pEtxs types.PendingEtxs + if err := json.Unmarshal(raw, &pEtxs); err != nil { + return types.PendingEtxs{}, err + } + return pEtxs, nil +} + func (ec *Client) SendPendingEtxsToDom(ctx context.Context, pEtxs types.PendingEtxs) error { fields := make(map[string]interface{}) fields["header"] = pEtxs.Header.RPCMarshalHeader()