在启动的流程里其实是挖了一些坑的,至少网络的坑在这次分析中尽量填上,在网络分析中重点分析一下网络发现,由于对RUST不是多熟悉,并且波卡链中使用了好多的第三方库,所以界定起来可能有点麻烦。
接着上篇分析,从network.start_network()开始,它会调用selt.start()(substrate/network/src/service.rs):
fn start(&self) {
match self.network.start().map_err(Into::into) {
Err(ErrorKind::Io(ref e)) if e.kind() == io::ErrorKind::AddrInUse =>
warn!("Network port {:?} is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option.", self.network.config().listen_address.expect("Listen address is not set.")),
Err(err) => warn!("Error starting network: {}", err),
_ => {},
};
self.network.register_protocol(self.handler.clone(), DOT_PROTOCOL_ID, &[(0, V0_PACKET_COUNT)])
.unwrap_or_else(|e| warn!("Error registering polkadot protocol: {:?}", e));
}
这个函数就两个功能:启动网络并注册通信协议。同时处理这两个功能实现时出现的异常。
下面看一下连接上来的节点的控制:
/// On-demand service API.
pub trait OnDemandService: Send + Sync {
/// When new node is connected.
fn on_connect(&self, peer: PeerId, role: service::Role);
/// When node is disconnected.
fn on_disconnect(&self, peer: PeerId);
/// Maintain peers requests.
fn maintain_peers(&self, io: &mut SyncIo);
/// When response is received from remote node.
fn on_remote_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse);
}
/// On-demand requests service. Dispatches requests to appropriate peers.
pub struct OnDemand<E: service::ExecuteInContext> {
core: Mutex<OnDemandCore<E>>,
checker: Arc<FetchChecker>,
}
//赋值给默认值使用的属性
#[derive(Default)]
struct OnDemandCore<E: service::ExecuteInContext> {
service: Weak<E>,
next_request_id: u64,
pending_requests: VecDeque<Request>,
active_peers: LinkedHashMap<PeerId, Request>,
idle_peers: VecDeque<PeerId>,
}
上面的三个数据结构是管理服务中的基础数据结构,在整个服务中都用得到。看一下如何控制连接节点。
fn on_connect(&self, peer: PeerId, role: service::Role) {
if !role.intersects(service::Role::FULL | service::Role::COLLATOR | service::Role::VALIDATOR) { // TODO: correct?
return;
}
let mut core = self.core.lock();
core.add_peer(peer);
core.dispatch();
}
pub fn add_peer(&mut self, peer: PeerId) {
self.idle_peers.push_back(peer);
}
先创建空闲节点,然后再得到连接数据后将其与实际的远端节点绑定:
pub fn dispatch(&mut self) {
let service = match self.service.upgrade() {
Some(service) => service,
None => return,
};
while !self.pending_requests.is_empty() {
let peer = match self.idle_peers.pop_front() {
Some(peer) => peer,
None => return,
};
let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed");
request.timestamp = Instant::now();
trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer);
service.execute_in_context(|ctx, protocol| {
let message = message::RemoteCallRequest {
id: request.id,
block: request.request.block,
method: request.request.method.clone(),
data: request.request.call_data.clone(),
};
protocol.send_message(ctx, peer, message::Message::RemoteCallRequest(message))
});
self.active_peers.insert(peer, request);
}
}
//发送请求的块内容,如果出错,断开PEER。
pub fn send_message(&self, io: &mut SyncIo, peer_id: PeerId, mut message: Message) {
match &mut message {
&mut Message::BlockRequest(ref mut r) => {
let mut peers = self.peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
r.id = peer.next_request_id;
peer.next_request_id = peer.next_request_id + 1;
peer.block_request = Some(r.clone());
peer.request_timestamp = Some(time::Instant::now());
}
},
_ => (),
}
let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed");
if let Err(e) = io.send(peer_id, data) {
debug!(target:"sync", "Error sending message: {:?}", e);
io.disconnect_peer(peer_id);
}
}
类似于所有的区块链都要有一个分发的处理函数(network/src/protocol.rs),由read函数调用其它(network/src/service.rs=>fn read())来实现:
pub fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, data: &[u8]) {
let message: Message = match serde_json::from_slice(data) {
Ok(m) => m,
Err(e) => {
debug!("Invalid packet from {}: {}", peer_id, e);
io.disable_peer(peer_id);
return;
}
};
//又看到了匹配
match message {
Message::Status(s) => self.on_status_message(io, peer_id, s),
Message::BlockRequest(r) => self.on_block_request(io, peer_id, r),
Message::BlockResponse(r) => {
let request = {
let mut peers = self.peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
peer.request_timestamp = None;
match mem::replace(&mut peer.block_request, None) {
Some(r) => r,
None => {
debug!("Unexpected response packet from {}", peer_id);
io.disable_peer(peer_id);
return;
}
}
} else {
debug!("Unexpected packet from {}", peer_id);
io.disable_peer(peer_id);
return;
}
};
if request.id != r.id {
trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", peer_id, request.id, r.id);
return;
}
self.on_block_response(io, peer_id, request, r);
},
Message::BlockAnnounce(announce) => {
self.on_block_announce(io, peer_id, announce);
},
Message::Statement(s) => self.on_statement(io, peer_id, s, blake2_256(data).into()),
Message::CandidateRequest(r) => self.on_candidate_request(io, peer_id, r),
Message::CandidateResponse(r) => self.on_candidate_response(io, peer_id, r),
Message::BftMessage(m) => self.on_bft_message(io, peer_id, m, blake2_256(data).into()),
Message::Transactions(m) => self.on_transactions(io, peer_id, m),
Message::RemoteCallRequest(request) => self.on_remote_call_request(io, peer_id, request),
Message::RemoteCallResponse(response) => self.on_remote_call_response(io, peer_id, response)
}
}
首先得到message,判断其是否正确。下来就是对消息进行分类处理,如果状态,声明,区块请求,侯选请求等.这个其实就类似于其它语言中的switch-case语句。在每个本地的self的函数中,调用相关的变量匹配函数,这个其实就是RUST中的use的用法,将其它名字空间的函数引用到本地来,这里举一个例子:self.on_candidate_request(io, peer_id, r)
fn on_candidate_request(&self, io: &mut SyncIo, peer: PeerId, request: message::CandidateRequest) {
trace!(target: "sync", "CandidateRequest {} from {} for {}", request.id, peer, request.hash);
self.consensus.lock().on_candidate_request(io, self, peer, request);
}
//network/src/consensus.rs
pub fn on_candidate_request(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, request: message::CandidateRequest) {
let response = match self.our_candidate {
Some((ref hash, ref data)) if *hash == request.hash => Some(data.clone()),
_ => None,
};
let msg = message::CandidateResponse {
id: request.id,
data: response,
};
protocol.send_message(io, peer_id, Message::CandidateResponse(msg));
}
在共识算法中,侯选人的请求函数,因为在波卡链中使用是一个基于BFT的算法,所以它仍然会有一个选举的过程。
在一个collator中,会创建一个平行链的全节点:
Box::new(stream::futures_unordered(egress_fetch)
.fold(BTreeMap::new(), |mut map, (routing_id, egresses)| {
for (depth, egress) in egresses.into_iter().rev().enumerate() {
let depth = -(depth as i64);
map.insert((depth, routing_id), egress);
}
Ok(map)
})
.map(|ordered| ordered.into_iter().map(|((_, id), egress)| (id, egress)))
.map(|i| i.collect::<Vec<_>>())
.map(ConsolidatedIngress))
}
/// Produce a candidate for the parachain.
pub fn collate<'a, R, P>(local_id: ParaId, relay_context: R, para_context: P)
-> Box<Future<Item=parachain::Candidate, Error=R::Error> + 'a>
where
R: RelayChainContext,
R::Error: 'a,
R::FutureEgress: 'a,
P: ParachainContext + 'a,
{
Box::new(collate_ingress(relay_context).map(move |ingress| {
let (block_data, _, signature) = para_context.produce_candidate(
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
);
parachain::Candidate {
parachain_index: local_id,
collator_signature: signature,
block: block_data,
unprocessed_ingress: ingress,
}
}))
}
在map中会注册相关的路由信息。这里其实是通向链路由的一个必经之路。然后在共识的服务中会有一个路由的控制(consensus/src/service.rs),目的是可以顺利的找到共识的节点:
type FetchCandidateAdapter = future::Map<net::FetchFuture, fn(Vec<u8>) -> BlockData>;
#[derive(Clone)]
struct Router {
network: Arc<net::ConsensusService>,
}
impl Router {
fn fetch_candidate_adapter(data: Vec<u8>) -> BlockData {
BlockData(data)
}
}
impl TableRouter for Router {
type Error = Canceled;
type FetchCandidate = FetchCandidateAdapter;
type FetchExtrinsic = future::FutureResult<Extrinsic, Self::Error>;
fn local_candidate_data(&self, hash: Hash, block_data: BlockData, _extrinsic: Extrinsic) {
let data = block_data.0;
self.network.set_local_candidate(Some((hash, data)))
}
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate {
let hash = candidate.hash();
self.network.fetch_candidate(&hash).map(Self::fetch_candidate_adapter)
}
fn fetch_extrinsic_data(&self, _candidate: &CandidateReceipt) -> Self::FetchExtrinsic {
future::ok(Extrinsic)
}
}
它的意义在于批准不同的平行链的区块后就可以进行中继链的区块的批准,然后正式提交区块。另外在协议中也会有相关的配置:
/// Called when a new peer is connected
pub fn on_peer_connected(&self, io: &mut SyncIo, peer_id: PeerId) {
trace!(target: "sync", "Connected {}: {}", peer_id, io.peer_info(peer_id));
self.handshaking_peers.write().insert(peer_id, time::Instant::now());
self.send_status(io, peer_id);
}
/// Send Status message
fn send_status(&self, io: &mut SyncIo, peer_id: PeerId) {
if let Ok(info) = self.chain.info() {
let status = message::Status {
version: PROTOCOL_VERSION,
genesis_hash: info.chain.genesis_hash,
roles: self.config.roles.into(),
best_number: info.chain.best_number,
best_hash: info.chain.best_hash,
validator_signature: None,
validator_id: None,
parachain_id: None,
};
self.send_message(io, peer_id, Message::Status(status))
}
}
这就和前面的服务启动对应上了,在启动时,会自动注册相关的节点状态。然后配以路由,进行转发。