Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fabric/orderer/common/multichannel/registrar.go #17

Open
yfhk opened this issue Nov 22, 2018 · 17 comments
Open

fabric/orderer/common/multichannel/registrar.go #17

yfhk opened this issue Nov 22, 2018 · 17 comments

Comments

@yfhk
Copy link
Owner

yfhk commented Nov 22, 2018

// NewRegistrar produces an instance of a *Registrar.
func NewRegistrar(ledgerFactory blockledger.Factory, consenters map[string]consensus.Consenter,
signer crypto.LocalSigner, callbacks ...func(bundle *channelconfig.Bundle)) *Registrar {
r := &Registrar{
chains: make(map[string]*ChainSupport),
ledgerFactory: ledgerFactory,
consenters: consenters,
signer: signer,
callbacks: callbacks,
}

existingChains := ledgerFactory.ChainIDs()
for _, chainID := range existingChains {
	rl, err := ledgerFactory.GetOrCreate(chainID)
	if err != nil {
		logger.Panicf("Ledger factory reported chainID %s but could not retrieve it: %s", chainID, err)
	}
	configTx := getConfigTx(rl)
	if configTx == nil {
		logger.Panic("Programming error, configTx should never be nil here")
	}
	ledgerResources := r.newLedgerResources(configTx)
	chainID := ledgerResources.ConfigtxValidator().ChainID()

	if _, ok := ledgerResources.ConsortiumsConfig(); ok {
		if r.systemChannelID != "" {
			logger.Panicf("There appear to be two system chains %s and %s", r.systemChannelID, chainID)
		}
		chain := newChainSupport(
			r,
			ledgerResources,
			consenters,
			signer)
		r.templator = msgprocessor.NewDefaultTemplator(chain)
		chain.Processor = msgprocessor.NewSystemChannel(chain, r.templator, msgprocessor.CreateSystemChannelFilters(r, chain))

		// Retrieve genesis block to log its hash. See FAB-5450 for the purpose
		iter, pos := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}})
		defer iter.Close()
		if pos != uint64(0) {
			logger.Panicf("Error iterating over system channel: '%s', expected position 0, got %d", chainID, pos)
		}
		genesisBlock, status := iter.Next()
		if status != cb.Status_SUCCESS {
			logger.Panicf("Error reading genesis block of system channel '%s'", chainID)
		}
		logger.Infof("Starting system channel '%s' with genesis block hash %x and orderer type %s", chainID, genesisBlock.Header.Hash(), chain.SharedConfig().ConsensusType())

		r.chains[chainID] = chain
		r.systemChannelID = chainID
		r.systemChannel = chain
		// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
		defer chain.start()
	} else {
		logger.Debugf("Starting chain: %s", chainID)
		chain := newChainSupport(
			r,
			ledgerResources,
			consenters,
			signer)
		r.chains[chainID] = chain
		chain.start()
	}

}

if r.systemChannelID == "" {
	logger.Panicf("No system chain found.  If bootstrapping, does your system channel contain a consortiums group definition?")
}

return r

}

@yfhk yfhk changed the title fabric/orderer fabric/orderer/common/multichannel/registrar.go Nov 22, 2018
@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

// StartDeliverForChannel starts blocks delivery for channel
// initializes the grpc stream for given chainID, creates blocks provider instance
// that spawns in go routine to read new blocks starting from the position provided by ledger
// info instance.
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if _, exist := d.blockProviders[chainID]; exist {
errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
} else {
client := d.newClient(chainID, ledgerInfo)
logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
go d.launchBlockProvider(chainID, finalizer)
}
return nil
}

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

func (d *deliverServiceImpl) launchBlockProvider(chainID string, finalizer func()) {
d.lock.RLock()
pb := d.blockProviders[chainID]
d.lock.RUnlock()
if pb == nil {
logger.Info("Block delivery for channel", chainID, "was stopped before block provider started")
return
}
pb.DeliverBlocks()
finalizer()
}

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

// VerifyBlock returns nil if the block is properly signed, and the claimed seqNum is the
// sequence number that the block's header contains.
// else returns error
func (s *cisMessageCryptoService) VerifyBlock(chainID common.ChainID, seqNum uint64, signedBlock []byte) error {
// - Convert signedBlock to common.Block.
block, err := utils.GetBlockFromBlockBytes(signedBlock)
if err != nil {
return fmt.Errorf("Failed unmarshalling block bytes on channel [%s]: [%s]", chainID, err)
}

if block.Header == nil {
	return fmt.Errorf("Invalid Block on channel [%s]. Header must be different from nil.", chainID)
}

blockSeqNum := block.Header.Number
if seqNum != blockSeqNum {
	return fmt.Errorf("Claimed seqNum is [%d] but actual seqNum inside block is [%d]", seqNum, blockSeqNum)
}

// - Extract channelID and compare with chainID
channelID, err := utils.GetChainIDFromBlock(block)
if err != nil {
	return fmt.Errorf("Failed getting channel id from block with id [%d] on channel [%s]: [%s]", block.Header.Number, chainID, err)
}

if channelID != string(chainID) {
	return fmt.Errorf("Invalid block's channel id. Expected [%s]. Given [%s]", chainID, channelID)
}

// - Unmarshal medatada
if block.Metadata == nil || len(block.Metadata.Metadata) == 0 {
	return fmt.Errorf("Block with id [%d] on channel [%s] does not have metadata. Block not valid.", block.Header.Number, chainID)
}

metadata, err := utils.GetMetadataFromBlock(block, pcommon.BlockMetadataIndex_SIGNATURES)
if err != nil {
	return fmt.Errorf("Failed unmarshalling medatata for signatures [%s]", err)
}

// - Verify that Header.DataHash is equal to the hash of block.Data
// This is to ensure that the header is consistent with the data carried by this block
if !bytes.Equal(block.Data.Hash(), block.Header.DataHash) {
	return fmt.Errorf("Header.DataHash is different from Hash(block.Data) for block with id [%d] on channel [%s]", block.Header.Number, chainID)
}

// - Get Policy for block validation

// Get the policy manager for channelID
cpm, ok := s.channelPolicyManagerGetter.Manager(channelID)
if cpm == nil {
	return fmt.Errorf("Could not acquire policy manager for channel %s", channelID)
}
// ok is true if it was the manager requested, or false if it is the default manager
mcsLogger.Debugf("Got policy manager for channel [%s] with flag [%t]", channelID, ok)

// Get block validation policy
policy, ok := cpm.GetPolicy(policies.BlockValidation)
// ok is true if it was the policy requested, or false if it is the default policy
mcsLogger.Debugf("Got block validation policy for channel [%s] with flag [%t]", channelID, ok)

// - Prepare SignedData
signatureSet := []*pcommon.SignedData{}
for _, metadataSignature := range metadata.Signatures {
	shdr, err := utils.GetSignatureHeader(metadataSignature.SignatureHeader)
	if err != nil {
		return fmt.Errorf("Failed unmarshalling signature header for block with id [%d] on channel [%s]: [%s]", block.Header.Number, chainID, err)
	}
	signatureSet = append(
		signatureSet,
		&pcommon.SignedData{
			FromAddr:  shdr.Creator,
			Data:      util.ConcatenateBytes(metadata.Value, metadataSignature.SignatureHeader, block.Header.Bytes()),
			Signature: metadataSignature.Signature,
		},
	)
}

// - Evaluate policy
return policy.Evaluate(signatureSet)

}

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

其中,
// - Extract channelID and compare with chainID
channelID, err := utils.GetChainIDFromBlock(block)
if err != nil {
return fmt.Errorf("Failed getting channel id from block with id [%d] on channel [%s]: [%s]", block.Header.Number, chainID, err)
}

if channelID != string(chainID) {
	return fmt.Errorf("Invalid block's channel id. Expected [%s]. Given [%s]", chainID, channelID)
}

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

orderer端:

chain, ok := ds.sm.GetChain(chdr.ChannelId)
if !ok {
	// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
	// So we would expect our log to be somewhat flooded with these
	logger.Debugf("Rejecting deliver for %s because channel %s not found", addr, chdr.ChannelId)
	return sendStatusReply(srv, cb.Status_NOT_FOUND)
}

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

cursor, number := chain.Reader().Iterator(seekInfo.Start)
defer cursor.Close()
var stopNum uint64
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
	stopNum = number
case *ab.SeekPosition_Newest:
	stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified:
	stopNum = stop.Specified.Number
	if stopNum < number {
		logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
		return sendStatusReply(srv, cb.Status_BAD_REQUEST)
	}
}

其中,
chain.Reader().Height()
说明Height()对每个chain是不一样的

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

查找SeekInfo:

/home/jia.hu/golibs/src/github.com/hyperledger/fabric/core/deliverservice/requester.go:
56
57 func (b *blocksRequester) seekOldest() error {
58: seekInfo := &orderer.SeekInfo{
59 Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
60 Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
..
73
74 func (b *blocksRequester) seekLatestFromCommitter(height uint64) error {
75: seekInfo := &orderer.SeekInfo{
76 Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: height}}},
77 Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},

/home/jia.hu/golibs/src/github.com/hyperledger/fabric/orderer/common/performance/utils.go:
149 channelID,
150 localcis.NewSigner(),
151: &ab.SeekInfo{Start: seekOldest, Stop: seekSpecified(number), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY},
152 0,
153 0,

/home/jia.hu/golibs/src/github.com/hyperledger/fabric/peer/channel/deliverclient.go:
58 ) *common.Envelope {
59
60: seekInfo := &ab.SeekInfo{
61 Start: position,
62 Stop: position,

12 matches across 10 files

filter:
/home/jia.hu/golibs/src/github.com/hyperledger/fabric/,-/vendor/,.go,-.pb.go,-*test.go

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

// DeliverService used to communicate with orderers to obtain
// new blocks and send them to the committer service
type DeliverService interface {
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
// to channel peers.
// When the delivery finishes, the finalizer func is called
StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error

// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service
// to channel peers.
StopDeliverForChannel(chainID string) error

// UpdateEndpoints
UpdateEndpoints(chainID string, endpoints []string) error

// Stop terminates delivery service and closes the connection
Stop()

}

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

Searching 541 files for "blockProviders" (case sensitive, whole word)

/home/jia.hu/golibs/src/github.com/hyperledger/fabric/core/deliverservice/deliveryclient.go:
69 type deliverServiceImpl struct {
70 conf *Config
71: blockProviders map[string]blocksprovider.BlocksProvider
72 lock sync.RWMutex
73 stopping bool
..
100 ds := &deliverServiceImpl{
101 conf: conf,
102: blockProviders: make(map[string]blocksprovider.BlocksProvider),
103 }
104 if err := ds.validateConfiguration(); err != nil {
...
111 // Use chainID to obtain blocks provider and pass endpoints
112 // for update
113: if bp, ok := d.blockProviders[chainID]; ok {
114 // We have found specified channel so we can safely update it
115 bp.UpdateOrderingEndpoints(endpoints)
...
151 return errors.New(errMsg)
152 }
153: if _, exist := d.blockProviders[chainID]; exist {
154 errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
155 logger.Errorf(errMsg)
...
158 client := d.newClient(chainID, ledgerInfo)
159 logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
160: d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
161 go d.launchBlockProvider(chainID, finalizer)
162 }
...
166 func (d *deliverServiceImpl) launchBlockProvider(chainID string, finalizer func()) {
167 d.lock.RLock()
168: pb := d.blockProviders[chainID]
169 d.lock.RUnlock()
170 if pb == nil {
...
185 return errors.New(errMsg)
186 }
187: if client, exist := d.blockProviders[chainID]; exist {
188 client.Stop()
189: delete(d.blockProviders, chainID)
190 logger.Debug("This peer will stop pass blocks from orderer service to other peers")
191 } else {
...
204 d.stopping = true
205
206: for _, client := range d.blockProviders {
207 client.Stop()
208 }

9 matches in 1 file

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

// NewBlocksProvider constructor function to create blocks deliverer instance
func NewBlocksProvider(chainID string, client streamClient, gossip GossipServiceAdapter, mcs api.MessageCryptoService) BlocksProvider {
return &blocksProviderImpl{
chainID: chainID,
client: client,
gossip: gossip,
mcs: mcs,
wrongStatusThreshold: wrongStatusThreshold,
}
}

// DeliverBlocks used to pull out blocks from the ordering service to
// distributed them across peers
func (b *blocksProviderImpl) DeliverBlocks() {
errorStatusCounter := 0
statusCounter := 0
defer b.client.Close()
for !b.isDone() {
msg, err := b.client.Recv()
if err != nil {
logger.Warningf("[%s] Receive error: %s", b.chainID, err.Error())
return
}
switch t := msg.Type.(type) {
case *orderer.DeliverResponse_Status:
if t.Status == common.Status_SUCCESS {
logger.Warningf("[%s] ERROR! Received success for a seek that should never complete", b.chainID)
return
}
if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN {
logger.Errorf("[%s] Got error %v", b.chainID, t)
errorStatusCounter++
if errorStatusCounter > b.wrongStatusThreshold {
logger.Criticalf("[%s] Wrong statuses threshold passed, stopping block provider", b.chainID)
return
}
} else {
errorStatusCounter = 0
logger.Warningf("[%s] Got error %v", b.chainID, t)
}
maxDelay := float64(maxRetryDelay)
currDelay := float64(time.Duration(math.Pow(2, float64(statusCounter))) * 100 * time.Millisecond)
time.Sleep(time.Duration(math.Min(maxDelay, currDelay)))
if currDelay < maxDelay {
statusCounter++
}
if t.Status == common.Status_BAD_REQUEST {
b.client.Disconnect(false)
} else {
b.client.Disconnect(true)
}
continue
case *orderer.DeliverResponse_Block:
errorStatusCounter = 0
statusCounter = 0
seqNum := t.Block.Header.Number

		marshaledBlock, err := proto.Marshal(t.Block)
		if err != nil {
			logger.Errorf("[%s] Error serializing block with sequence number %d, due to %s", b.chainID, seqNum, err)
			continue
		}
		if err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), seqNum, marshaledBlock); err != nil {
			logger.Errorf("[%s] Error verifying block with sequnce number %d, due to %s", b.chainID, seqNum, err)
			continue
		}

		//add by huxiao			
		record.RecordNewBlockToFile(t.Block)

		numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))
		// Create payload with a block received
		payload := createPayload(seqNum, marshaledBlock)
		// Use payload to create gossip message
		gossipMsg := createGossipMsg(b.chainID, payload)

		logger.Debugf("[%s] Adding payload locally, buffer seqNum = [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers)
		// Add payload to local state payloads buffer
		if err := b.gossip.AddPayload(b.chainID, payload); err != nil {
			logger.Warning("Failed adding payload of", seqNum, "because:", err)
		}

		// Gossip messages with other nodes
		logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, seqNum, numberOfPeers)
		b.gossip.Gossip(gossipMsg)
	default:
		logger.Warningf("[%s] Received unknown: ", b.chainID, t)
		return
	}
}

}

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

// Recv receives a message from the ordering service
func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) {
o, err := bc.try(func() (interface{}, error) {
if bc.shouldStop() {
return nil, errors.New("closing")
}
return bc.BlocksDeliverer.Recv()
})
if err != nil {
return nil, err
}
return o.(*orderer.DeliverResponse), nil
}

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

func (x *atomicBroadcastDeliverClient) Recv() (*DeliverResponse, error) {
m := new(DeliverResponse)

if err := x.ClientStream.RecvMsg(m); err != nil {
	return nil, err
}
fmt.Println("atomicBroadcastDeliverClient Recv")
return m, nil

}

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][]byte, privateInfo *privateDataInfo) {
dig2src := make(map[*gossip2.PvtDataDigest][]*peer.Endorsement)
privateInfo.missingKeys.foreach(func(k rwSetKey) {
logger.Debug("Fetching", k, "from peers")
dig := &gossip2.PvtDataDigest{
TxId: k.txID,
SeqInBlock: k.seqInBlock,
Collection: k.collection,
Namespace: k.namespace,
BlockSeq: blockSeq,
}
dig2src[dig] = privateInfo.sources[k]
})
fetchedData, err := c.fetch(dig2src)
if err != nil {
logger.Warning("Failed fetching private data for block", blockSeq, "from peers:", err)
return
}

// Iterate over data fetched from peers
for _, element := range fetchedData {
	dig := element.Digest
	for _, rws := range element.Payload {
		hash := hex.EncodeToString(util2.ComputeSHA256(rws))
		key := rwSetKey{
			txID:       dig.TxId,
			namespace:  dig.Namespace,
			collection: dig.Collection,
			seqInBlock: dig.SeqInBlock,
			hash:       hash,
		}
		if _, isMissing := privateInfo.missingKeys[key]; !isMissing {
			logger.Debug("Ignoring", key, "because it wasn't found in the block")
			continue
		}
		ownedRWsets[key] = rws
		delete(privateInfo.missingKeys, key)
		// If we fetch private data that is associated to block i, then our last block persisted must be i-1
		// so our ledger height is i, since blocks start from 0.
		c.TransientStore.Persist(dig.TxId, blockSeq, key.toTxPvtReadWriteSet(rws))
		logger.Debug("Fetched", key)
	}
}

}

@yfhk
Copy link
Owner Author

yfhk commented Nov 22, 2018

func fetch(cmd *cobra.Command, args []string, cf *ChannelCmdFactory) error {
var err error
if cf == nil {
cf, err = InitCmdFactory(EndorserNotRequired, OrdererRequired, channelID)
if err != nil {
return err
}
}

if len(args) == 0 {
	return fmt.Errorf("fetch target required, oldest, newest, config, or a number")
}

if len(args) > 2 {
	return fmt.Errorf("trailing args detected")
}

var block *cb.Block

switch args[0] {
case "oldest":
	block, err = cf.DeliverClient.getOldestBlock()
case "newest":
	block, err = cf.DeliverClient.getNewestBlock()
case "config":
	iBlock, err2 := cf.DeliverClient.getNewestBlock()
	if err2 != nil {
		return err2
	}
	lc, err2 := utils.GetLastConfigIndexFromBlock(iBlock)
	if err2 != nil {
		return err2
	}
	block, err = cf.DeliverClient.getSpecifiedBlock(lc)
default:
	num, err2 := strconv.Atoi(args[0])
	if err2 != nil {
		return fmt.Errorf("fetch target illegal: %s", args[0])
	}
	block, err = cf.DeliverClient.getSpecifiedBlock(uint64(num))
}

if err != nil {
	return err
}

b, err := proto.Marshal(block)
if err != nil {
	return err
}

var file string
if len(args) == 1 {
	file = channelID + "_" + args[0] + ".block"
} else {
	file = args[1]
}

if err = ioutil.WriteFile(file, b, 0644); err != nil {
	return err
}

return nil

}

Repository owner deleted a comment Feb 2, 2024
Repository owner deleted a comment from LeonardoWlopes Feb 23, 2024
Repository owner deleted a comment Feb 26, 2024
@itsnotgunnar
Copy link

itsnotgunnar commented Feb 26, 2024 via email

@ackinc
Copy link

ackinc commented Mar 4, 2024 via email

@Hurricane31337
Copy link

Don't klick on the job offer link, it's a scam!

Repository owner deleted a comment from qywang2012 Mar 4, 2024
Repository owner deleted a comment from roneyfraga Mar 18, 2024
@github-staff github-staff deleted a comment from naudachu May 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants
@Hurricane31337 @ackinc @yfhk @itsnotgunnar and others