The definition of the service in the node, eth is actually a service.
type Service interface {
// Protocols retrieves the P2P protocols the service wishes to start.
Protocols() []p2p.Protocol
// APIs retrieves the list of RPC descriptors the service provides
APIs() []rpc.API
// Start is called after all services have been constructed and the networking
// layer was also initialized to spawn any goroutines required by the service.
Start(server *p2p.Server) error
// Stop terminates all goroutines belonging to the service, blocking until they
// are all terminated.
Stop() error
}
Go ethereum's eth directory is an implementation of the Ethereum service. The Ethereum protocol is injected through the Register method of node.
// RegisterEthService adds an Ethereum client to the stack.
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
var err error
if cfg.SyncMode == downloader.LightSync {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return les.New(ctx, cfg)
})
} else {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg)
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}
return fullNode, err
})
}
if err != nil {
Fatalf("Failed to register the Ethereum service: %v", err)
}
}
Data structure of the Ethereum agreement
// Ethereum implements the Ethereum full node service.
type Ethereum struct {
config *Config // config object
chainConfig *params.ChainConfig // chain config object
// Channel for shutting down the service
shutdownChan chan bool // Channel for shutting down the ethereum
stopDbUpgrade func() error // stop chain db sequential key upgrade
// Handlers
txPool *core.TxPool // transaction pool
blockchain *core.BlockChain // blockchain
protocolManager *ProtocolManager // protocol management
lesServer LesServer // light weight server
// DB interfaces
chainDb ethdb.Database // Block chain database
eventMux *event.TypeMux
engine consensus.Engine // Consensus engine, default is POW
accountManager *accounts.Manager // account management
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
ApiBackend *EthApiBackend // API backend for use by RPC services
miner *miner.Miner // miner object
gasPrice *big.Int // The minimum value of the gasPrice received by the node. Trades smaller than this value will be rejected by this node
etherbase common.Address // This is the miner address
networkId uint64 // Network ID testnet is 0 mainnet is 1
netRPCService *ethapi.PublicNetAPI // RPC service
lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
}
The creation of the Ethereum agreement: New.
Temporarily does not involve the content of the core. Just a general introduction. The contents of the core will be analyzed later.
// New creates a new Ethereum object (including the
// initialisation of the common Ethereum object)
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if config.SyncMode == downloader.LightSync {
return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
}
if !config.SyncMode.IsValid() {
return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
}
// Create a leveldb. Open or create a new chaindata directory
chainDb, err := CreateDB(ctx, config, "chaindata")
if err != nil {
return nil, err
}
// upgrade database
stopDbUpgrade := upgradeDeduplicateData(chainDb)
// Set up the genesis block. If there is already a genesis block in the database, then take it out of the database (private chain). Or get the default value from the code.
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
}
log.Info("Initialised chain configuration", "config", chainConfig)
eth := &Ethereum{
config: config,
chainDb: chainDb,
chainConfig: chainConfig,
eventMux: ctx.EventMux,
accountManager: ctx.AccountManager,
engine: CreateConsensusEngine(ctx, config, chainConfig, chainDb), // consensus engine
shutdownChan: make(chan bool),
stopDbUpgrade: stopDbUpgrade,
networkId: config.NetworkId, // Use to distinguish the network
gasPrice: config.GasPrice, // if less than this it will be discarded
etherbase: config.Etherbase, // miner address
bloomRequests: make(chan chan *bloombits.Retrieval), //bloom filter
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks),
}
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
if !config.SkipBcVersionCheck { // Check whether the BlockChainVersion stored in the database is consistent with the version of the client's BlockChainVersion.
bcVersion := core.GetBlockChainVersion(chainDb)
if bcVersion != core.BlockChainVersion && bcVersion != 0 {
return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, core.BlockChainVersion)
}
core.WriteBlockChainVersion(chainDb, core.BlockChainVersion)
}
vmConfig := vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
// create a blockchain database
eth.blockchain, err = core.NewBlockChain(chainDb, eth.chainConfig, eth.engine, vmConfig)
if err != nil {
return nil, err
}
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
log.Warn("Rewinding chain to upgrade configuration", "err", compat)
eth.blockchain.SetHead(compat.RewindTo)
core.WriteChainConfig(chainDb, genesisHash, chainConfig)
}
// bloomIndexer start bloom filter service
eth.bloomIndexer.Start(eth.blockchain.CurrentHeader(), eth.blockchain.SubscribeChainEvent)
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
// Create a transaction pool. Used to store transactions received locally or on the network.
eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
// Create a protocol manager
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
}
// create miner
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
eth.miner.SetExtra(makeExtraData(config.ExtraData))
// ApiBackend for RPC calls
eth.ApiBackend = &EthApiBackend{eth, nil}
// gpoParams GPO Gas Price Abbreviation for Oracle. GasPrice forecast. The current value of GasPrice is predicted by the most recent transaction. This value can be used as a reference for the cost of sending a transaction later.
gpoParams := config.GPO
if gpoParams.Default == nil {
gpoParams.Default = config.GasPrice
}
eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams)
return eth, nil
}
ApiBackend is defined in the api_backend.go file. Some functions are encapsulated.
// EthApiBackend implements ethapi.Backend for full nodes
type EthApiBackend struct {
eth *Ethereum
gpo *gasprice.Oracle
}
func (b *EthApiBackend) SetHead(number uint64) {
b.eth.protocolManager.downloader.Cancel()
b.eth.blockchain.SetHead(number)
}
In addition to some methods in core in the New method, there is a ProtocolManager object that is more important in the Ethereum protocol. Ethereum is originally a protocol. In the ProtocolManager, you can manage multiple sub-protocols of Ethereum.
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkId: networkId,
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
chaindb: chaindb,
chainconfig: config,
peers: newPeerSet(),
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
log.Warn("Blockchain not empty, fast sync disabled")
mode = downloader.FullSync
}
if mode == downloader.FastSync {
manager.fastSync = uint32(1)
}
// Initiate a sub-protocol for every implemented version we can handle
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
// Skip protocol version if incompatible with the mode of operation
if mode == downloader.FastSync && version < eth63 {
continue
}
// Compatible; initialise the sub-protocol
version := version // Closure for the run
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
// Remember the protocol in p2p. After the p2p peer connection succeeds, the Run method will be called.
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
}
if len(manager.SubProtocols) == 0 {
return nil, errIncompatibleConfig
}
// Construct the different synchronisation mechanisms
// The downloader is responsible for synchronizing its own data from other peers.
// Downloader is a full chain synchronization tool
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
// Validator is a function that uses the consensus engine to verify the block header
validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)
}
// function to return the block height
heighter := func() uint64 {
return blockchain.CurrentBlock().NumberU64()
}
// If fast sync is turned on. Then the inserter will not be called.
inserter := func(blocks types.Blocks) (int, error) {
// If fast sync is running, deny importing weird blocks
if atomic.LoadUint32(&manager.fastSync) == 1 {
log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
return 0, nil
}
// Set to start receiving transactions
atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
// then insert the block
return manager.blockchain.InsertChain(blocks)
}
// generate a fetcher
// Fetcher is responsible for accumulating block notifications from various peers and arranging for retrieval.
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
return manager, nil
}
The APIs() method of the service returns the RPC method exposed by the service.
// APIs returns the collection of RPC services the ethereum package offers.
// NOTE, some of these services probably need to be moved to somewhere else.
func (s *Ethereum) APIs() []rpc.API {
apis := ethapi.GetAPIs(s.ApiBackend)
// Append any APIs exposed explicitly by the consensus engine
apis = append(apis, s.engine.APIs(s.BlockChain())...)
// Append all the local APIs and return
return append(apis, []rpc.API{
{
Namespace: "eth",
Version: "1.0",
Service: NewPublicEthereumAPI(s),
Public: true,
},
...
, {
Namespace: "net",
Version: "1.0",
Service: s.netRPCService,
Public: true,
},
}...)
}
The Protocols method of the service will return the services provided by those p2p protocols. Return all SubProtocols in the Protocol Manager. If there is lesServer then provide the protocol of lesServer. can be seen. All network functions are provided through the protocol.
// Protocols implements node.Service, returning all the currently configured
// network protocols to start.
func (s *Ethereum) Protocols() []p2p.Protocol {
if s.lesServer == nil {
return s.protocolManager.SubProtocols
}
return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...)
}
After the Ethereum service is created, the service's Start method is called. Let's take a look at the Start method.
// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr *p2p.Server) error {
// Start the bloom bits servicing goroutines
s.startBloomHandlers()
// Start the RPC service
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
// Figure out a max peers count based on the server limits
maxPeers := srvr.MaxPeers
if s.config.LightServ > 0 {
maxPeers -= s.config.LightPeers
if maxPeers < srvr.MaxPeers/2 {
maxPeers = srvr.MaxPeers / 2
}
}
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
// start the light weight server
s.lesServer.Start(srvr)
}
return nil
}
Protocol Manager data structure
type ProtocolManager struct {
networkId uint64
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
txpool txPool
blockchain *core.BlockChain
chaindb ethdb.Database
chainconfig *params.ChainConfig
maxPeers int
downloader *downloader.Downloader
fetcher *fetcher.Fetcher
peers *peerSet
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
txCh chan core.TxPreEvent
txSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync
quitSync chan struct{}
noMorePeers chan struct{}
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
}
The Start method of the protocol manager. This method starts a large number of goroutines to handle various transactions. It can be speculated that this class should be the main implementation class of the Ethereum service.
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
// The channel for broadcast transactions. txCh will serve as the TxPreEvent subscription channel for txpool. Txpool will notify this txCh with this message. The goroutine of the broadcast transaction will broadcast this message.
pm.txCh = make(chan core.TxPreEvent, txChanSize)
// Subscription receipt
pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
// Start the broadcast goroutine
go pm.txBroadcastLoop()
// broadcast mined blocks
// Subscribe to mining news. A message is generated when the new block is dug up. This subscription and the subscription above use two different modes, this is the subscription method labeled Deprecated.
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
// Mining broadcast goroutine When you dig out, you need to broadcast it to the network as soon as possible, sooner is better.
go pm.minedBroadcastLoop()
// start sync handlers
// The synchronizer is responsible for periodically synchronizing with the network, downloading hashes and blocks, and processing notification handlers.
go pm.syncer()
// txsyncLoop is responsible for the initial transaction synchronization for each new connection. When a new peer appears, we forward all pending transactions. In order to minimize the use of export bandwidth, we only send one packet at a time.
go pm.txsyncLoop()
}
When the server of p2p is started, it will actively find the node to connect, or be connected by other nodes. The process of connecting is to first perform a handshake of the encrypted channel and then perform a handshake of the protocol. Finally, the goroutine is executed for each protocol to execute the Run method to give control to the final protocol. This run method first creates a peer object, then calls the handle method to handle the peer.
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer: // Send the peer to the newPeerCh channel
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer) // Call the handlo method
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
handle method,
// handle is the callback invoked to manage the life cycle of an eth peer. When
// this function terminates, the peer is disconnected.
func (pm *ProtocolManager) handle(p *peer) error {
if pm.peers.Len() >= pm.maxPeers {
return p2p.DiscTooManyPeers
}
p.Log().Debug("Ethereum peer connected", "name", p.Name())
// Execute the Ethereum handshake
td, head, genesis := pm.blockchain.Status()
// Td is total difficult, head is the current block header, and genesis is the information of the genesis block. Only the genesis block is the same to be able to shake hands successfully.
if err := p.Handshake(pm.networkId, td, head, genesis); err != nil {
p.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
// Register the peer locally
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.forkDrop != nil {
p.forkDrop.Stop()
p.forkDrop = nil
}
}()
}
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Ethereum message handling failed", "err", err)
return err
}
}
}
Handshake
// Handshake executes the eth protocol handshake, negotiating version number,
// network IDs, difficulties, head and genesis blocks.
func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash) error {
// Send out own handshake in a new thread
// The size of the error channel is 2, in order to process the following two goroutine methods at once.
errc := make(chan error, 2)
var status statusData
// safe to read after two values have been received from errc
go func() {
errc <- p2p.Send(p.rw, StatusMsg, &statusData{
ProtocolVersion: uint32(p.version),
NetworkId: network,
TD: td,
CurrentBlock: head,
GenesisBlock: genesis,
})
}()
go func() {
errc <- p.readStatus(network, &status, genesis)
}()
timeout := time.NewTimer(handshakeTimeout)
defer timeout.Stop()
// If you receive any error (send, receive), or if it times out, then disconnect.
for i := 0; i < 2; i++ {
select {
case err := <-errc:
if err != nil {
return err
}
case <-timeout.C:
return p2p.DiscReadTimeout
}
}
p.td, p.head = status.TD, status.CurrentBlock
return nil
}
readStatus, check the various situations returned by the peer,
func (p *peer) readStatus(network uint64, status *statusData, genesis common.Hash) (err error) {
msg, err := p.rw.ReadMsg()
if err != nil {
return err
}
if msg.Code != StatusMsg {
return errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
}
if msg.Size > ProtocolMaxMsgSize {
return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
}
// Decode the handshake and make sure everything matches
if err := msg.Decode(&status); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
if status.GenesisBlock != genesis {
return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock[:8], genesis[:8])
}
if status.NetworkId != network {
return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, network)
}
if int(status.ProtocolVersion) != p.version {
return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.version)
}
return nil
}
Register simply add the peer to your peer's map
// Register injects a new peer into the working set, or returns an error if the
// peer is already known.
func (ps *peerSet) Register(p *peer) error {
ps.lock.Lock()
defer ps.lock.Unlock()
if ps.closed {
return errClosed
}
if _, ok := ps.peers[p.id]; ok {
return errAlreadyRegistered
}
ps.peers[p.id] = p
return nil
}
After a series of checks and handshakes, the loop calls the handleMsg method to handle the event loop. This method is very long, mainly to deal with the response after receiving various messages.
// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is turn down upon returning any error.
func (pm *ProtocolManager) handleMsg(p *peer) error {
// Read the next message from the remote peer, and ensure it's fully consumed
msg, err := p.rw.ReadMsg()
if err != nil {
return err
}
if msg.Size > ProtocolMaxMsgSize {
return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
}
defer msg.Discard()
// Handle the message depending on its contents
switch {
case msg.Code == StatusMsg:
// Status messages should never arrive after the handshake
// StatusMsg should be received during the HandleShake phase. After HandleShake, you should not receive such a message.
return errResp(ErrExtraStatusMsg, "uncontrolled status message")
// Block header query, collect the requested headers and reply
// Upon receiving the message requesting the header of the block, it will return the block header information according to the request.
case msg.Code == GetBlockHeadersMsg:
// Decode the complex header query
var query getBlockHeadersData
if err := msg.Decode(&query); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
hashMode := query.Origin.Hash != (common.Hash{})
// Gather headers until the fetch or network limits is reached
var (
bytes common.StorageSize
headers []*types.Header
unknown bool
)
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
// Retrieve the next header satisfying the query
var origin *types.Header
if hashMode {
origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
} else {
origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
}
if origin == nil {
break
}
number := origin.Number.Uint64()
headers = append(headers, origin)
bytes += estHeaderRlpSize
// Advance to the next header of the query
switch {
case query.Origin.Hash != (common.Hash{}) && query.Reverse:
// Hash based traversal towards the genesis block
for i := 0; i < int(query.Skip)+1; i++ {
if header := pm.blockchain.GetHeader(query.Origin.Hash, number); header != nil {// Get the previous block header by hash and number
query.Origin.Hash = header.ParentHash
number--
} else {
unknown = true
break // Break is the jump out of the switch. Unknow is used to jump out of the loop.
}
}
case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
// Hash based traversal towards the leaf block
var (
current = origin.Number.Uint64()
next = current + query.Skip + 1
)
if next <= current { // Forward, but next is smaller than the current, against integer overflow attacks.
infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
unknown = true
} else {
if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
if pm.blockchain.GetBlockHashesFromHash(header.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
// If you can find this header, and this header and origin are on the same chain.
query.Origin.Hash = header.Hash()
} else {
unknown = true
}
} else {
unknown = true
}
}
case query.Reverse: // find by number
// Number based traversal towards the genesis block
// query.Origin.Hash == (common.Hash{})
if query.Origin.Number >= query.Skip+1 {
query.Origin.Number -= (query.Skip + 1)
} else {
unknown = true
}
case !query.Reverse: /
// Number based traversal towards the leaf block
query.Origin.Number += (query.Skip + 1)
}
}
return p.SendBlockHeaders(headers)
case msg.Code == BlockHeadersMsg: // Received the result from GetBlockHeadersMsg.
// A batch of headers arrived to one of our previous requests
var headers []*types.Header
if err := msg.Decode(&headers); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// If no headers were received, but we're expending a DAO fork check, maybe it's that
// If the peer does not return any headers, and forkDrop is not empty, then it should be our DAO check request, we have previously sent a DAO header request in HandShake.
if len(headers) == 0 && p.forkDrop != nil {
// Possibly an empty reply to the fork header checks, sanity check TDs
verifyDAO := true
// If we already have a DAO header, we can check the peer's TD against it. If
// the peer's ahead of this, it too must have a reply to the DAO check
if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
// This time check whether the peer's total difficult has exceeded the td value of the DAO fork block, if it exceeds, it means that the peer should have this block header, but the returned blank, then the verification fails. Nothing is done here. If the peer does not send it, it will be timed out.
verifyDAO = false
}
}
// If we're seemingly on the same chain, disable the drop timer
if verifyDAO { // If the verification is successful, delete the timer and return.
p.Log().Debug("Seems to be on the same side of the DAO fork")
p.forkDrop.Stop()
p.forkDrop = nil
return nil
}
}
// Filter out any explicitly requested headers, deliver the rest to the downloader
// If the length is 1, then filter is true
filter := len(headers) == 1
if filter {
// If it's a potential DAO fork check, validate against the rules
if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
// Disable the fork drop timer
p.forkDrop.Stop()
p.forkDrop = nil
// Validate the header and either drop the peer or continue
if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
return err
}
p.Log().Debug("Verified to be on the same side of the DAO fork")
return nil
}
// Irrelevant of the fork checks, send the header to the fetcher just in case
// If it is not a DAO request, pass it to the filter for filtering. The filter will return the headers that need to be processed, and these headers will be handed over to the downloader for distribution.
headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
}
if len(headers) > 0 || !filter {
err := pm.downloader.DeliverHeaders(p.id, headers)
if err != nil {
log.Debug("Failed to deliver headers", "err", err)
}
}
case msg.Code == GetBlockBodiesMsg:
// Block Body's request This is relatively simple. Get the body return from the blockchain.
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
// Gather blocks until the fetch or network limits is reached
var (
hash common.Hash
bytes int
bodies []rlp.RawValue
)
for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
// Retrieve the hash of the next block
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Retrieve the requested block body, stopping if enough was found
if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
bodies = append(bodies, data)
bytes += len(data)
}
}
return p.SendBlockBodiesRLP(bodies)
case msg.Code == BlockBodiesMsg:
// A batch of block bodies arrived to one of our previous requests
var request blockBodiesData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Deliver them all to the downloader for queuing
trasactions := make([][]*types.Transaction, len(request))
uncles := make([][]*types.Header, len(request))
for i, body := range request {
trasactions[i] = body.Transactions
uncles[i] = body.Uncles
}
// Filter out any explicitly requested bodies, deliver the rest to the downloader
filter := len(trasactions) > 0 || len(uncles) > 0
if filter {
trasactions, uncles = pm.fetcher.FilterBodies(p.id, trasactions, uncles, time.Now())
}
if len(trasactions) > 0 || len(uncles) > 0 || !filter {
err := pm.downloader.DeliverBodies(p.id, trasactions, uncles)
if err != nil {
log.Debug("Failed to deliver bodies", "err", err)
}
}
case p.version >= eth63 && msg.Code == GetNodeDataMsg:
// The version of the peer is eth63 and is requesting NodeData
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
// Gather state data until the fetch or network limits is reached
var (
hash common.Hash
bytes int
data [][]byte
)
for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {
// Retrieve the hash of the next state entry
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Retrieve the requested state entry, stopping if enough was found
// Any hash value requested will be returned to the other party.
if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil {
data = append(data, entry)
bytes += len(entry)
}
}
return p.SendNodeData(data)
case p.version >= eth63 && msg.Code == NodeDataMsg:
// A batch of node state data arrived to one of our previous requests
var data [][]byte
if err := msg.Decode(&data); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Deliver all to the downloader
if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
log.Debug("Failed to deliver node state data", "err", err)
}
case p.version >= eth63 && msg.Code == GetReceiptsMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
// Gather state data until the fetch or network limits is reached
var (
hash common.Hash
bytes int
receipts []rlp.RawValue
)
for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch {
// Retrieve the hash of the next block
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Retrieve the requested block's receipts, skipping if unknown to us
results := core.GetBlockReceipts(pm.chaindb, hash, core.GetBlockNumber(pm.chaindb, hash))
if results == nil {
if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
}
}
// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(results); err != nil {
log.Error("Failed to encode receipt", "err", err)
} else {
receipts = append(receipts, encoded)
bytes += len(encoded)
}
}
return p.SendReceiptsRLP(receipts)
case p.version >= eth63 && msg.Code == ReceiptsMsg:
// A batch of receipts arrived to one of our previous requests
var receipts [][]*types.Receipt
if err := msg.Decode(&receipts); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Deliver all to the downloader
if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {
log.Debug("Failed to deliver receipts", "err", err)
}
case msg.Code == NewBlockHashesMsg:
// Received BlockHashesMsg message
var announces newBlockHashesData
if err := msg.Decode(&announces); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
// Mark the hashes as present at the remote node
for _, block := range announces {
p.MarkBlock(block.Hash)
}
// Schedule all the unknown hashes for retrieval
unknown := make(newBlockHashesData, 0, len(announces))
for _, block := range announces {
if !pm.blockchain.HasBlock(block.Hash, block.Number) {
unknown = append(unknown, block)
}
}
for _, block := range unknown {
// Notify fetcher that there is a potential block to download
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
}
case msg.Code == NewBlockMsg:
// Retrieve and decode the propagated block
var request newBlockData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
request.Block.ReceivedAt = msg.ReceivedAt
request.Block.ReceivedFrom = p
// Mark the peer as owning the block and schedule it for import
p.MarkBlock(request.Block.Hash())
pm.fetcher.Enqueue(p.id, request.Block)
// Assuming the block is importable by the peer, but possibly not yet done so,
// calculate the head hash and TD that the peer truly must have.
var (
trueHead = request.Block.ParentHash()
trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty())
)
// Update the peers total difficulty if better than the previous
if _, td := p.Head(); trueTD.Cmp(td) > 0 {
// If the real TD and head of the peer are different from those recorded on our side, set the real head and td of the peer.
p.SetHead(trueHead, trueTD)
// Schedule a sync if above ours. Note, this will not fire a sync for a gap of
// a singe block (as the true TD is below the propagated block), however this
// scenario should easily be covered by the fetcher.
currentBlock := pm.blockchain.CurrentBlock()
if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
go pm.synchronise(p)
}
}
case msg.Code == TxMsg:
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
break
}
// Transactions can be processed, parse all of them and deliver to the pool
var txs []*types.Transaction
if err := msg.Decode(&txs); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
return errResp(ErrDecode, "transaction %d is nil", i)
}
p.MarkTransaction(tx.Hash())
}
// Add to txpool
pm.txpool.AddRemotes(txs)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
return nil
}
Several synchronises, which are called when the node of the other party is updated than the node itself.
// synchronise tries to sync up our local block chain with a remote peer.
func (pm *ProtocolManager) synchronise(peer *peer) {
// Short circuit if no peers are available
if peer == nil {
return
}
// Make sure the peer's TD is higher than our own
currentBlock := pm.blockchain.CurrentBlock()
td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
pHead, pTd := peer.Head()
if pTd.Cmp(td) <= 0 {
return
}
// Otherwise try to sync with the downloader
mode := downloader.FullSync
if atomic.LoadUint32(&pm.fastSync) == 1 {
// Fast sync was explicitly requested, and explicitly granted
mode = downloader.FastSync
} else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 { // if empty database
// The database seems empty as the current block is the genesis. Yet the fast
// block is ahead, so fast sync was enabled for this node at a certain point.
// The only scenario where this can happen is if the user manually (or via a
// bad block) rolled back a fast sync node below the sync point. In this case
// however it's safe to reenable fast sync.
atomic.StoreUint32(&pm.fastSync, 1)
mode = downloader.FastSync
}
// Run the sync cycle, and disable fast sync if we've went past the pivot block
err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode)
if atomic.LoadUint32(&pm.fastSync) == 1 {
// Disable fast sync if we indeed have something in our chain
if pm.blockchain.CurrentBlock().NumberU64() > 0 {
log.Info("Fast sync complete, auto disabling")
atomic.StoreUint32(&pm.fastSync, 0)
}
}
if err != nil {
return
}
atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
// Synchronization completed Start receiving transactions.
if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
// We've completed a sync cycle, notify all peers of new state. This path is
// essential in star-topology networks where a gateway node needs to notify
// all its out-of-date peers of the availability of a new block. This failure
// scenario will most often crop up in private and hackathon networks with
// degenerate connectivity, but it should be healthy for the mainnet too to
// more reliably update peers or the local TD state.
go pm.BroadcastBlock(head, false)
}
}
Transaction broadcasts. txBroadcastLoop The goroutine that is started at start. TxCh writes an event to this when txpool receives a valid transaction. Then broadcast the transaction to all peers
func (self *ProtocolManager) txBroadcastLoop() {
for {
select {
case event := <-self.txCh:
self.BroadcastTx(event.Tx.Hash(), event.Tx)
// Err() channel will be closed when unsubscribing.
case <-self.txSub.Err():
return
}
}
}
Mining broadcasts. The newly dug mine is broadcasted when the subscription event is received.
// Mined broadcast loop
func (self *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
for obj := range self.minedBlockSub.Chan() {
switch ev := obj.Data.(type) {
case core.NewMinedBlockEvent:
self.BroadcastBlock(ev.Block, true) // First propagate block to peers
self.BroadcastBlock(ev.Block, false) // Only then announce to the rest
}
}
}
The syncer is responsible for synchronizing with the network regularly.
// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms
pm.fetcher.Start()
defer pm.fetcher.Stop()
defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations
forceSync := time.NewTicker(forceSyncCycle)
defer forceSync.Stop()
for {
select {
case <-pm.newPeerCh: // It will be synchronized when new Peer is added. Block broadcasts may also be triggered at this time.
// Make sure we have peers to select from, then sync
if pm.peers.Len() < minDesiredPeerCount {
break
}
go pm.synchronise(pm.peers.BestPeer())
case <-forceSync.C:
// Timed trigger once every 10 seconds
// Force a sync even if not enough peers are present
// BestPeer() selects the node with the most total difficulty.
go pm.synchronise(pm.peers.BestPeer())
case <-pm.noMorePeers: // Exit signal
return
}
}
}
txsyncLoop is responsible for sending the pending transaction to the newly established connection.
// txsyncLoop takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
func (pm *ProtocolManager) txsyncLoop() {
var (
pending = make(map[discover.NodeID]*txsync)
sending = false // whether a send is active
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
)
// send starts a sending a pack of transactions from the sync.
send := func(s *txsync) {
// Fill pack with transactions up to the target size.
size := common.StorageSize(0)
pack.p = s.p
pack.txs = pack.txs[:0]
for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
pack.txs = append(pack.txs, s.txs[i])
size += s.txs[i].Size()
}
// Remove the transactions that will be sent.
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
if len(s.txs) == 0 {
delete(pending, s.p.ID())
}
// Send the pack in the background.
s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
sending = true
go func() { done <- pack.p.SendTransactions(pack.txs) }()
}
// pick chooses the next pending sync.
// Pick a txsync randomly to send.
pick := func() *txsync {
if len(pending) == 0 {
return nil
}
n := rand.Intn(len(pending)) + 1
for _, s := range pending {
if n--; n == 0 {
return s
}
}
return nil
}
for {
select {
case s := <-pm.txsyncCh:
pending[s.p.ID()] = s
if !sending {
send(s)
}
case err := <-done:
sending = false
// Stop tracking peers that cause send failures.
if err != nil {
pack.p.Log().Debug("Transaction send failed", "err", err)
delete(pending, pack.p.ID())
}
// Schedule the next send.
if s := pick(); s != nil {
send(s)
}
case <-pm.quitSync:
return
}
}
}
The producer of the txsyncCh queue, syncTransactions is called inside the handle method. It will be called once when the new link has just been created.
// syncTransactions starts sending all currently pending transactions to the given peer.
func (pm *ProtocolManager) syncTransactions(p *peer) {
var txs types.Transactions
pending, _ := pm.txpool.Pending()
for _, batch := range pending {
txs = append(txs, batch...)
}
if len(txs) == 0 {
return
}
select {
case pm.txsyncCh <- &txsync{p, txs}:
case <-pm.quitSync:
}
}
Some of our big processes now.
Block synchronization
- If it is the mine that you dug yourself. Broadcast by goroutine minedBroadcastLoop().
- If it is receiving a block broadcast from another person (NewBlockHashesMsg/NewBlockMsg), is the peer that the fetcher will notify
- The goroutine syncer() will synchronize the information with BestPeer().
Transaction synchronization
- New connection is established. Send the pending transaction to him.
- A transaction was sent locally, or a transaction sent by someone else was received. Txpool will generate a message and the message will be passed to the txCh channel. It is then processed by goroutine txBroadcastLoop() and sent to other peers who do not know the transaction.