Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fabric/core/deliverservice/blockprovider/blocksprovider.go #14

Open
yfhk opened this issue Nov 21, 2018 · 2 comments
Open

fabric/core/deliverservice/blockprovider/blocksprovider.go #14

yfhk opened this issue Nov 21, 2018 · 2 comments

Comments

@yfhk
Copy link
Owner

yfhk commented Nov 21, 2018

// DeliverBlocks used to pull out blocks from the ordering service to
// distributed them across peers
func (b *blocksProviderImpl) DeliverBlocks() {
errorStatusCounter := 0
statusCounter := 0
defer b.client.Close()
for !b.isDone() {
msg, err := b.client.Recv()
if err != nil {
logger.Warningf("[%s] Receive error: %s", b.chainID, err.Error())
return
}
switch t := msg.Type.(type) {
case *orderer.DeliverResponse_Status:
if t.Status == common.Status_SUCCESS {
logger.Warningf("[%s] ERROR! Received success for a seek that should never complete", b.chainID)
return
}
if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN {
logger.Errorf("[%s] Got error %v", b.chainID, t)
errorStatusCounter++
if errorStatusCounter > b.wrongStatusThreshold {
logger.Criticalf("[%s] Wrong statuses threshold passed, stopping block provider", b.chainID)
return
}
} else {
errorStatusCounter = 0
logger.Warningf("[%s] Got error %v", b.chainID, t)
}
maxDelay := float64(maxRetryDelay)
currDelay := float64(time.Duration(math.Pow(2, float64(statusCounter))) * 100 * time.Millisecond)
time.Sleep(time.Duration(math.Min(maxDelay, currDelay)))
if currDelay < maxDelay {
statusCounter++
}
if t.Status == common.Status_BAD_REQUEST {
b.client.Disconnect(false)
} else {
b.client.Disconnect(true)
}
continue
case *orderer.DeliverResponse_Block:
errorStatusCounter = 0
statusCounter = 0
seqNum := t.Block.Header.Number

		marshaledBlock, err := proto.Marshal(t.Block)
		if err != nil {
			logger.Errorf("[%s] Error serializing block with sequence number %d, due to %s", b.chainID, seqNum, err)
			continue
		}
		if err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), seqNum, marshaledBlock); err != nil {
			logger.Errorf("[%s] Error verifying block with sequnce number %d, due to %s", b.chainID, seqNum, err)
			continue
		}

		//add by huxiao			
		record.RecordNewBlockToFile(t.Block)

		numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
		// Create payload with a block received
		payload := createPayload(seqNum, marshaledBlock)
		// Use payload to create gossip message
		gossipMsg := createGossipMsg(b.chainID, payload)

		logger.Debugf("[%s] Adding payload locally, buffer seqNum = [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers)
		// Add payload to local state payloads buffer
		if err := b.gossip.AddPayload(b.chainID, payload); err != nil {
			logger.Warning("Failed adding payload of", seqNum, "because:", err)
		}

		// Gossip messages with other nodes
		logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers)
		b.gossip.Gossip(gossipMsg)
	default:
		logger.Warningf("[%s] Received unknown: ", b.chainID, t)
		return
	}
}

}

@yfhk
Copy link
Owner Author

yfhk commented Nov 21, 2018

// StartDeliverForChannel starts blocks delivery for channel
// initializes the grpc stream for given chainID, creates blocks provider instance
// that spawns in go routine to read new blocks starting from the position provided by ledger
// info instance.
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if _, exist := d.blockProviders[chainID]; exist {
errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
} else {
client := d.newClient(chainID, ledgerInfo)
logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
go d.launchBlockProvider(chainID, finalizer)
}
return nil
}

@yfhk
Copy link
Owner Author

yfhk commented Nov 21, 2018

	case *orderer.DeliverResponse_Block:
		errorStatusCounter = 0
		statusCounter = 0
		seqNum := t.Block.Header.Number

		marshaledBlock, err := proto.Marshal(t.Block)
		if err != nil {
			logger.Errorf("[%s] Error serializing block with sequence number %d, due to %s", b.chainID, seqNum, err)
			continue
		}
		if err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), seqNum, marshaledBlock); err != nil {
			logger.Errorf("[%s] Error verifying block with sequnce number %d, due to %s", b.chainID, seqNum, err)
			continue
		}

		//add by huxiao			
		record.RecordNewBlockToFile(t.Block)

		numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
		// Create payload with a block received
		payload := createPayload(seqNum, marshaledBlock)
		// Use payload to create gossip message
		gossipMsg := createGossipMsg(b.chainID, payload)

		logger.Debugf("[%s] Adding payload locally, buffer seqNum = [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers)
		// Add payload to local state payloads buffer
		if err := b.gossip.AddPayload(b.chainID, payload); err != nil {
			logger.Warning("Failed adding payload of", seqNum, "because:", err)
		}

		// Gossip messages with other nodes
		logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers)
		b.gossip.Gossip(gossipMsg)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant