交易的发起从客户端的RPC发起,在波卡链中,把这个称做外部交易。外部交易将被RPC打包到交易池中,代码如下:
struct RpcTransactionPool {
inner: Arc<Mutex<txpool::TransactionPool>>,
network: Arc<network::Service>,
}
//这里使用并实现了trait
impl substrate_rpc::author::AuthorApi for RpcTransactionPool {
fn submit_extrinsic(&self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> {
use primitives::hexdisplay::HexDisplay;
use polkadot_runtime::UncheckedExtrinsic;
use codec::Slicable;
info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0));
//通过XT交易来进行转换,这个在以后会根据情况进行处理。
let decoded = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s))
.ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?;
info!("Correctly formatted: {:?}", decoded);
//导入到交易池
self.inner.lock().import(decoded)
.map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError)?;
self.network.trigger_repropagate();
Ok(())
}
}
/// Called when new transactons are imported by the client.
pub fn trigger_repropagate(&self) {
self.network.with_context(DOT_PROTOCOL_ID, |context| {
//分发交易
self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context));
});
}
/// Called when we propagate ready transactions to peers.
pub fn propagate_transactions(&self, io: &mut SyncIo) {
debug!(target: "sync", "Propagating transactions");
// Accept transactions only when fully synced
if self.sync.read().status().state != SyncState::Idle {
return;
}
let transactions = self.transaction_pool.transactions();
//得到可发送节点
let mut peers = self.peers.write();
for (peer_id, ref mut peer) in peers.iter_mut() {
let to_send: Vec<_> = transactions.iter().filter_map(|&(hash, ref t)|
if peer.known_transactions.insert(hash.clone()) { Some(t.clone()) } else { None }).collect();
if !to_send.is_empty() {
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), peer_id);
//发送交易
self.send_message(io, *peer_id, Message::Transactions(to_send));
}
}
}
trigger_repropagate函数的注释说得很清楚,当一个新的交易被客户端导入时,触发这个函数。propagate_transactions这个函数(propagate有繁殖扩散的意思,很贴切)会判断一下节点的状态,如果非空闲就直接返回(基本上比特币和以太坊也是这个调调)。否则根据节点中的状态发送得到的交易。
传播交易其实是非常重要的一步,这也是Service.rs这个文件的重要一个功能。
回到交易池,在交易池的代码(polkadot\transaction_pool\src\lib.rs)中的库的引用中有下面一段代码:
pub use transaction_pool::{Options, Status, LightStatus, NoopListener, VerifiedTransaction as VerifiedTransactionOps};
也即是说,把几种状态(轻节点,校验等)都引入了进来。在这个池子的代码中,区块和区块产生也包含在内了,先重点分析一下交易池:
/// The polkadot transaction pool.
///
/// Wraps a `transaction-pool::Pool`.
pub struct TransactionPool {
inner: transaction_pool::Pool<VerifiedTransaction, Scoring>,
insertion_index: u64, // TODO: use AtomicU64 when it stabilizes
}
impl TransactionPool {
/// Create a new transaction pool.
pub fn new(options: Options) -> Self {
TransactionPool {
inner: Pool::new(NoopListener, Scoring, options),
insertion_index: 0,
}
}
/// Verify and import a transaction into the pool.这里将交易导入交易池并做检验
pub fn import(&mut self, xt: UncheckedExtrinsic) -> Result<Arc<VerifiedTransaction>> {
let insertion_index = self.insertion_index;
self.insertion_index += 1;
//生成一个检查器
let verified = VerifiedTransaction::create(xt, insertion_index)?;
info!("Extrinsic verified {:?}. Importing...", verified);
// TODO: just use a foreign link when the error type is made public.
//将错误保存
let hash = verified.hash.clone();
self.inner.import(verified)
.map_err(|e|
match e {
// TODO: make error types public in transaction_pool. For now just treat all errors as AlreadyImported
_ => ErrorKind::AlreadyImported(hash),
// transaction_pool::error::AlreadyImported(h) => ErrorKind::AlreadyImported(h),
// e => ErrorKind::Import(Box::new(e)),
})
.map_err(Into::into)
}
//它们都在同一个文件中
/// A verified transaction which should be includable and non-inherent.
#[derive(Debug, Clone)]
pub struct VerifiedTransaction {
inner: <UncheckedExtrinsic as Checkable>::Checked,//未检查的外部交易状态
hash: Hash,
address: AccountId,
insertion_id: u64,
encoded_size: usize,
}
impl VerifiedTransaction {
/// Attempt to verify a transaction.
fn create(xt: UncheckedExtrinsic, insertion_id: u64) -> Result<Self> {
if !xt.is_signed() {
bail!(ErrorKind::IsInherent(xt))
}
let message = codec::Slicable::encode(&xt);//处理消息数据
//熟悉的switch
match xt.check() {
Ok(xt) => {
// TODO: make transaction-pool use generic types.
let hash = substrate_primitives::hashing::blake2_256(&message);
let address = xt.signed;
Ok(VerifiedTransaction {
inner: xt,
hash: hash.into(),
encoded_size: message.len(),
address,
insertion_id,
})
}
Err(xt) => Err(ErrorKind::BadSignature(xt).into()),
}
}
经过检验后的交易就将其返回,准备打包到区块上。
前面看到了分发的代码,接着分析具体的调用代码:
pub fn send_message(&self, io: &mut SyncIo, peer_id: PeerId, mut message: Message) {
match &mut message {
&mut Message::BlockRequest(ref mut r) => {
let mut peers = self.peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
r.id = peer.next_request_id;
peer.next_request_id = peer.next_request_id + 1;
peer.block_request = Some(r.clone());
peer.request_timestamp = Some(time::Instant::now());
}
},
_ => (), //否则默认返回一个空元组
}
let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed");
if let Err(e) = io.send(peer_id, data) {//调用 IO发送
debug!(target:"sync", "Error sending message: {:?}", e);
io.disconnect_peer(peer_id);
}
}
//network/src/io.rs
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<(), NetworkError>{
self.network.send(peer_id, 0, data)
}
上面的代码把字符串序列化后,通过IO发送到网络接口上。这里的网络接口的库包没找到,不知道是原生库的还是自己写的。
交易的打包上链在波卡链上比较麻烦,它主要分成两部分,也就是说一个是平行链上的打包交易,然后生成队列再发送到中继链上进行打包交易确认。也就是说,它是分成两部分的。
在波卡链上,块的生成是通过共识来产生的。平行链和中继链的块产生没有本质区别,区块产生在全节点上,轻节点一般只是用来做验证(polkadot/api/src/full.rs)。为了能够更清晰的看出区块产生的过程,先看一下它的测试函数:
#[test]
fn build_block() {
let client = client();
let id = client.check_id(BlockId::Number(0)).unwrap();
let block_builder = client.build_block(&id, 1_000_000, Vec::new()).unwrap();
let block = block_builder.bake();
assert_eq!(block.header.number, 1);
assert!(block.header.extrinsics_root != Default::default());
}
它其实只调用了ID检查,创建区块和区块头的检验。实际的产生应该和此类似。
fn propose_with(&self, candidates: Vec<CandidateReceipt>) -> Result<SubstrateBlock, Error> {
// TODO: handle case when current timestamp behind that in state.
let timestamp = current_timestamp();
let mut block_builder = self.client.build_block(
&self.parent_id,
timestamp,
candidates,
)?;
let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);
{
let mut pool = self.transaction_pool.lock();
let mut unqueue_invalid = Vec::new();
let mut pending_size = 0;
pool.cull(None, readiness_evaluator.clone());
for pending in pool.pending(readiness_evaluator.clone()) {
// skip and cull transactions which are too large.
if pending.encoded_size() > MAX_TRANSACTIONS_SIZE {
unqueue_invalid.push(pending.hash().clone());
continue
}
if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }
match block_builder.push_extrinsic(pending.as_transaction().clone()) {
Ok(()) => {
pending_size += pending.encoded_size();
}
Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
unqueue_invalid.push(pending.hash().clone());
}
}
}
for tx_hash in unqueue_invalid {
pool.remove(&tx_hash, false);
}
}
let polkadot_block = block_builder.bake();
info!("Proposing block [number: {}; hash: {}; parent_hash: {}; extrinsics: [{}]]",
polkadot_block.header.number,
Hash::from(polkadot_block.header.blake2_256()),
polkadot_block.header.parent_hash,
polkadot_block.extrinsics.iter()
.map(|xt| format!("{}", Hash::from(xt.blake2_256())))
.collect::<Vec<_>>()
.join(", ")
);
let substrate_block = Slicable::decode(&mut polkadot_block.encode().as_slice())
.expect("polkadot blocks defined to serialize to substrate blocks correctly; qed");
// TODO: full re-evaluation
let active_parachains = self.client.active_parachains(&self.parent_id)?;
assert!(evaluation::evaluate_initial(
&substrate_block,
timestamp,
&self.parent_hash,
self.parent_number,
&active_parachains,
).is_ok());
Ok(substrate_block)
}
//这里调用创建区块部分
impl<C, R, P> Future for CreateProposal<C, R, P>
where
C: PolkadotApi,
R: TableRouter,
P: Collators,
{
type Item = SubstrateBlock;
type Error = Error;
fn poll(&mut self) -> Poll<SubstrateBlock, Error> {
// 1. poll local collation future.
match self.collation.poll() {
Ok(Async::Ready((collation, extrinsic))) => {
let hash = collation.receipt.hash();
self.router.local_candidate_data(hash, collation.block_data, extrinsic);
// TODO: if we are an availability guarantor also, we should produce an availability statement.
self.table.sign_and_import(&self.router, GenericStatement::Candidate(collation.receipt));
}
Ok(Async::NotReady) => {},
Err(_) => {}, // TODO: handle this failure to collate.
}
// 2. try to propose if we have enough includable candidates and other
// delays have concluded.
let included = self.table.includable_count();
try_ready!(self.timing.poll(included));
// 3. propose这里调用区块的生产
let proposed_candidates = self.table.with_proposal(|proposed_set| {
proposed_set.into_iter().cloned().collect()
});
self.propose_with(proposed_candidates).map(Async::Ready)
}
}
这样一个区块就把交易打包好了,下一步就是对区块进行共识。