Skip to content

Latest commit

 

History

History
334 lines (291 loc) · 11.1 KB

波卡链源码分析之四交易.md

File metadata and controls

334 lines (291 loc) · 11.1 KB

波卡链源码分析之四交易


一、交易的发起


交易的发起从客户端的RPC发起,在波卡链中,把这个称做外部交易。外部交易将被RPC打包到交易池中,代码如下:
struct RpcTransactionPool {
	inner: Arc<Mutex<txpool::TransactionPool>>,
	network: Arc<network::Service>,
}
//这里使用并实现了trait
impl substrate_rpc::author::AuthorApi for RpcTransactionPool {
	fn submit_extrinsic(&self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> {
		use primitives::hexdisplay::HexDisplay;
		use polkadot_runtime::UncheckedExtrinsic;
		use codec::Slicable;

		info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0));
		//通过XT交易来进行转换,这个在以后会根据情况进行处理。
		let decoded = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s))
			.ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?;

		info!("Correctly formatted: {:?}", decoded);

    //导入到交易池
		self.inner.lock().import(decoded)
			.map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError)?;

		self.network.trigger_repropagate();
		Ok(())
	}
}
/// Called when new transactons are imported by the client.
pub fn trigger_repropagate(&self) {
	self.network.with_context(DOT_PROTOCOL_ID, |context| {
		//分发交易
		self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context));
	});
}
/// Called when we propagate ready transactions to peers.
pub fn propagate_transactions(&self, io: &mut SyncIo) {
	debug!(target: "sync", "Propagating transactions");

	// Accept transactions only when fully synced
	if self.sync.read().status().state != SyncState::Idle {
		return;
	}

	let transactions = self.transaction_pool.transactions();

  //得到可发送节点
	let mut peers = self.peers.write();
	for (peer_id, ref mut peer) in peers.iter_mut() {
		let to_send: Vec<_> = transactions.iter().filter_map(|&(hash, ref t)|
			if peer.known_transactions.insert(hash.clone()) { Some(t.clone()) } else { None }).collect();
		if !to_send.is_empty() {
			trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), peer_id);
			//发送交易
			self.send_message(io, *peer_id, Message::Transactions(to_send));
		}
	}
}

trigger_repropagate函数的注释说得很清楚,当一个新的交易被客户端导入时,触发这个函数。propagate_transactions这个函数(propagate有繁殖扩散的意思,很贴切)会判断一下节点的状态,如果非空闲就直接返回(基本上比特币和以太坊也是这个调调)。否则根据节点中的状态发送得到的交易。
传播交易其实是非常重要的一步,这也是Service.rs这个文件的重要一个功能。
回到交易池,在交易池的代码(polkadot\transaction_pool\src\lib.rs)中的库的引用中有下面一段代码:
pub use transaction_pool::{Options, Status, LightStatus, NoopListener, VerifiedTransaction as VerifiedTransactionOps};
也即是说,把几种状态(轻节点,校验等)都引入了进来。在这个池子的代码中,区块和区块产生也包含在内了,先重点分析一下交易池:
/// The polkadot transaction pool.
///
/// Wraps a `transaction-pool::Pool`.
pub struct TransactionPool {
	inner: transaction_pool::Pool<VerifiedTransaction, Scoring>,
	insertion_index: u64, // TODO: use AtomicU64 when it stabilizes
}

impl TransactionPool {
	/// Create a new transaction pool.
	pub fn new(options: Options) -> Self {
		TransactionPool {
			inner: Pool::new(NoopListener, Scoring, options),
			insertion_index: 0,
		}
	}

	/// Verify and import a transaction into the pool.这里将交易导入交易池并做检验
	pub fn import(&mut self, xt: UncheckedExtrinsic) -> Result<Arc<VerifiedTransaction>> {
		let insertion_index = self.insertion_index;
		self.insertion_index += 1;

    //生成一个检查器
		let verified = VerifiedTransaction::create(xt, insertion_index)?;

		info!("Extrinsic verified {:?}. Importing...", verified);

		// TODO: just use a foreign link when the error type is made public.
		//将错误保存
		let hash = verified.hash.clone();
		self.inner.import(verified)
			.map_err(|e|
				match e {
					// TODO: make error types public in transaction_pool. For now just treat all errors as AlreadyImported
					_ => ErrorKind::AlreadyImported(hash),
					// transaction_pool::error::AlreadyImported(h) => ErrorKind::AlreadyImported(h),
					// e => ErrorKind::Import(Box::new(e)),
				})
			.map_err(Into::into)
	}
	//它们都在同一个文件中
	/// A verified transaction which should be includable and non-inherent.
#[derive(Debug, Clone)]
pub struct VerifiedTransaction {
	inner: <UncheckedExtrinsic as Checkable>::Checked,//未检查的外部交易状态
	hash: Hash,
	address: AccountId,
	insertion_id: u64,
	encoded_size: usize,
}

impl VerifiedTransaction {
	/// Attempt to verify a transaction.
	fn create(xt: UncheckedExtrinsic, insertion_id: u64) -> Result<Self> {
		if !xt.is_signed() {
			bail!(ErrorKind::IsInherent(xt))
		}

		let message = codec::Slicable::encode(&xt);//处理消息数据
		//熟悉的switch
		match xt.check() {
			Ok(xt) => {
				// TODO: make transaction-pool use generic types.
				let hash = substrate_primitives::hashing::blake2_256(&message);
				let address = xt.signed;
				Ok(VerifiedTransaction {
					inner: xt,
					hash: hash.into(),
					encoded_size: message.len(),
					address,
					insertion_id,
				})
			}
			Err(xt) => Err(ErrorKind::BadSignature(xt).into()),
		}
	}

经过检验后的交易就将其返回,准备打包到区块上。

二、交易转发


前面看到了分发的代码,接着分析具体的调用代码:
pub fn send_message(&self, io: &mut SyncIo, peer_id: PeerId, mut message: Message) {
	match &mut message {
		&mut Message::BlockRequest(ref mut r) => {
			let mut peers = self.peers.write();
			if let Some(ref mut peer) = peers.get_mut(&peer_id) {
				r.id = peer.next_request_id;
				peer.next_request_id = peer.next_request_id + 1;
				peer.block_request = Some(r.clone());
				peer.request_timestamp = Some(time::Instant::now());
			}
		},
		_ => (),  //否则默认返回一个空元组
	}
	let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed");
	if let Err(e) = io.send(peer_id, data) {//调用 IO发送
		debug!(target:"sync", "Error sending message: {:?}", e);
		io.disconnect_peer(peer_id);
	}
}
//network/src/io.rs
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<(), NetworkError>{
	self.network.send(peer_id, 0, data)
}

上面的代码把字符串序列化后,通过IO发送到网络接口上。这里的网络接口的库包没找到,不知道是原生库的还是自己写的。

三、交易的打包和上链


交易的打包上链在波卡链上比较麻烦,它主要分成两部分,也就是说一个是平行链上的打包交易,然后生成队列再发送到中继链上进行打包交易确认。也就是说,它是分成两部分的。
在波卡链上,块的生成是通过共识来产生的。平行链和中继链的块产生没有本质区别,区块产生在全节点上,轻节点一般只是用来做验证(polkadot/api/src/full.rs)。为了能够更清晰的看出区块产生的过程,先看一下它的测试函数:
#[test]
fn build_block() {
	let client = client();

	let id = client.check_id(BlockId::Number(0)).unwrap();
	let block_builder = client.build_block(&id, 1_000_000, Vec::new()).unwrap();
	let block = block_builder.bake();

	assert_eq!(block.header.number, 1);
	assert!(block.header.extrinsics_root != Default::default());
}

它其实只调用了ID检查,创建区块和区块头的检验。实际的产生应该和此类似。
fn propose_with(&self, candidates: Vec<CandidateReceipt>) -> Result<SubstrateBlock, Error> {
		// TODO: handle case when current timestamp behind that in state.
		let timestamp = current_timestamp();
		let mut block_builder = self.client.build_block(
			&self.parent_id,
			timestamp,
			candidates,
		)?;

		let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);

		{
			let mut pool = self.transaction_pool.lock();
			let mut unqueue_invalid = Vec::new();
			let mut pending_size = 0;

			pool.cull(None, readiness_evaluator.clone());
			for pending in pool.pending(readiness_evaluator.clone()) {
				// skip and cull transactions which are too large.
				if pending.encoded_size() > MAX_TRANSACTIONS_SIZE {
					unqueue_invalid.push(pending.hash().clone());
					continue
				}

				if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }

				match block_builder.push_extrinsic(pending.as_transaction().clone()) {
					Ok(()) => {
						pending_size += pending.encoded_size();
					}
					Err(e) => {
						trace!(target: "transaction-pool", "Invalid transaction: {}", e);
						unqueue_invalid.push(pending.hash().clone());
					}
				}
			}

			for tx_hash in unqueue_invalid {
				pool.remove(&tx_hash, false);
			}
		}

		let polkadot_block = block_builder.bake();

		info!("Proposing block [number: {}; hash: {}; parent_hash: {}; extrinsics: [{}]]",
			polkadot_block.header.number,
			Hash::from(polkadot_block.header.blake2_256()),
			polkadot_block.header.parent_hash,
			polkadot_block.extrinsics.iter()
				.map(|xt| format!("{}", Hash::from(xt.blake2_256())))
				.collect::<Vec<_>>()
				.join(", ")
		);

		let substrate_block = Slicable::decode(&mut polkadot_block.encode().as_slice())
			.expect("polkadot blocks defined to serialize to substrate blocks correctly; qed");

		// TODO: full re-evaluation
		let active_parachains = self.client.active_parachains(&self.parent_id)?;
		assert!(evaluation::evaluate_initial(
			&substrate_block,
			timestamp,
			&self.parent_hash,
			self.parent_number,
			&active_parachains,
		).is_ok());

		Ok(substrate_block)
	}

//这里调用创建区块部分
impl<C, R, P> Future for CreateProposal<C, R, P>
	where
		C: PolkadotApi,
		R: TableRouter,
		P: Collators,
{
	type Item = SubstrateBlock;
	type Error = Error;

	fn poll(&mut self) -> Poll<SubstrateBlock, Error> {
		// 1. poll local collation future.
		match self.collation.poll() {
			Ok(Async::Ready((collation, extrinsic))) => {
				let hash = collation.receipt.hash();
				self.router.local_candidate_data(hash, collation.block_data, extrinsic);

				// TODO: if we are an availability guarantor also, we should produce an availability statement.
				self.table.sign_and_import(&self.router, GenericStatement::Candidate(collation.receipt));
			}
			Ok(Async::NotReady) => {},
			Err(_) => {}, // TODO: handle this failure to collate.
		}

		// 2. try to propose if we have enough includable candidates and other
		// delays have concluded.
		let included = self.table.includable_count();
		try_ready!(self.timing.poll(included));

		// 3. propose这里调用区块的生产
		let proposed_candidates = self.table.with_proposal(|proposed_set| {
			proposed_set.into_iter().cloned().collect()
		});

		self.propose_with(proposed_candidates).map(Async::Ready)
	}
}

这样一个区块就把交易打包好了,下一步就是对区块进行共识。