并行的状态控制主要有两个一个是WorldState.scala,另外一个是BlockWorldState.scala。从形式上来看二者是组合的关系,但实际应该是依赖或者实现的关系,不太明白Scala的用法。
object BlockWorldState {
sealed trait RaceCondition
case object OnAddress extends RaceCondition
case object OnAccount extends RaceCondition
case object OnStorage extends RaceCondition
case object OnCode extends RaceCondition
......
def apply(
blockchain: Blockchain,
accountNodeStorage: NodeKeyValueStorage,
storageNodeStorage: NodeKeyValueStorage,
accountStartNonce: UInt256,
stateRootHash: Option[Hash] = None
): BlockWorldState = {
/**
* Returns an accounts state trie "The world state (state), is a mapping
* between Keccak 256-bit hashes of the addresses (160-bit identifiers) and account states
* (a data structure serialised as RLP [...]).
* Though not stored on the blockchain, it is assumed that the implementation will maintain this mapping in a
* modified Merkle Patricia tree [...])."
*
* See [[http://paper.gavwood.com YP 4.1]]
*/
val underlyingAccountsTrie = MerklePatriciaTrie[Address, Account](
stateRootHash.getOrElse(Hash(trie.EmptyTrieHash)).bytes,
accountNodeStorage
)(Address.hashedAddressEncoder, Account.accountSerializer)
new BlockWorldState(
blockchain,
accountNodeStorage,
storageNodeStorage,
accountStartNonce,
blockchain.evmCodeStorage,
TrieAccounts(underlyingAccountsTrie),
Map(),
Map(),
Map(),
Map(),
Set(),
None
)
}
}
在他的下面是伴生类BlockWorldState
final class BlockWorldState private (
blockchain: Blockchain,
accountNodeStorage: NodeKeyValueStorage,
storageNodeStorage: NodeKeyValueStorage,
accountStartNonce: UInt256,
evmCodeStorage: EvmCodeStorage,
private var trieAccounts: TrieAccounts,
private var trieStorages: Map[Address, TrieStorage],
private var codes: Map[Address, ByteString],
private var accountDeltas: Map[Address, Vector[BlockWorldState.AccountDelta]],
private var raceConditions: Map[BlockWorldState.RaceCondition, Set[Address]],
private var touchedAddresses: Set[Address], // for debug
private var stx: Option[SignedTransaction] // for debug
) extends WorldState[BlockWorldState, TrieStorage]
通过上面的类和它的伴生对象可以看出,竞态条件并不太多,只有四个,即OnAddress、OnAccount、OnStorage、OnCode。和前面的说明是呼应的,其实在合并状态时,重点还是关注前三个,Code在执行时就会进行处理。
在状态的类成员中,有大量的状态获取函数,用来从当前世界状态中取得当前的状态值。事务的回滚也是靠这些数据来实现的,同时,由于只考虑记载当前交易状态,所以取消了对锁的控制。
并行的上下文主要有两个类:
object ProgramState {
trait ParallelRace
case object OnAccount extends ParallelRace
case object OnError extends ParallelRace
}
/**
* Intermediate state updated with execution of each opcode in the program
*
* @param context the context which initiates the program
* @param gas current gas for the execution
* @param stack current stack
* @param memory current memory
* @param pc program counter - an index of the opcode in the program to be executed
* @param returnData data to be returned from the program execution
* @param gasRefund the amount of gas to be refunded after execution (not sure if a separate field is required)
* @param addressesToDelete list of addresses of accounts scheduled to be deleted
* @param halted a flag to indicate program termination
* @param error indicates whether the program terminated abnormally
*/
final class ProgramState[W <: WorldState[W, S], S <: Storage[S]](val context: ProgramContext[W, S], val isDebugTraceEnabled: Boolean) {
import ProgramState._
var gas: Long = context.startGas
var world: W = context.world
var addressesToDelete: Set[Address] = context.initialAddressesToDelete
var addressesTouched: Set[Address] = context.initialAddressesTouched
var pc: Int = 0
var returnData: ByteString = ByteString()
var gasRefund: Long = 0
var txLogs: Vector[TxLogEntry] = Vector()
private var _halted: Boolean = false
var error: Option[ProgramError] = None
private var _isRevert: Boolean = false
var returnDataBuffer: ByteString = ByteString()
private var _parallelRaceConditions = Set[ParallelRace]()
val stack: Stack = Stack.empty()
val memory: Memory = Memory.empty()
......
def parallelRaceConditions = _parallelRaceConditions
def withParallelRaceCondition(race: ParallelRace) = {
this._parallelRaceConditions +=
this
}
def mergeParallelRaceConditions(races: Set[ParallelRace]) = {
this._parallelRaceConditions ++= racesrace
this
}
......
}
另外一个是程序的上下文:
object ProgramContext {
def apply[W <: WorldState[W, S], S <: Storage[S]](
stx: SignedTransaction,
recipientAddress: Address,
program: Program,
blockHeader: BlockHeader,
world: W,
config: EvmConfig,
initialAddressesToDelete: Set[Address],
initialAddressesTouched: Set[Address],
isStaticCall: Boolean
): ProgramContext[W, S] = {
// YP eq (91)
val inputData = if (stx.tx.isContractCreation) ByteString() else stx.tx.payload
val env = ExecEnv(
recipientAddress,
stx.sender,
stx.sender,
stx.tx.gasPrice,
inputData,
stx.tx.value,
program,
blockHeader,
callDepth = 0
)
val startGas = stx.tx.gasLimit - config.calcTransactionIntrinsicGas(stx.tx.payload, stx.tx.isContractCreation)
ProgramContext(env, recipientAddress, startGas, world, config, initialAddressesToDelete, initialAddressesTouched, isStaticCall)
}
}
/**
* Input parameters to a program executed on the EVM. Apart from the code itself
* it should have all (interfaces to) the data accessible from the EVM.
*
* @param env set of constants for the execution
* @param targetAddress used for determining whether a precompiled contract is being called (potentially
* different from the addresses defined in env)
* @param startGas initial gas for the execution
* @param world provides interactions with world state
* @param config evm config
* @param initialAddressesToDelete contains initial set of addresses to delete (from lower depth calls)
*/
final case class ProgramContext[W <: WorldState[W, S], S <: Storage[S]](
env: ExecEnv,
targetAddress: Address,
startGas: Long,
world: W,
config: EvmConfig,
initialAddressesToDelete: Set[Address],
initialAddressesTouched: Set[Address],
isStaticCall: Boolean
)
这两个类互相扶持,掌握着程序的上下文的状态,可以从其中得到世界状态,或者这样说,通过programstate来进行程序和并行交易的控制,包括并行的数量设置,都在这个类中。
状态和上下文结合后,开始在两个地方进行处理,一个是VM,一个是Ledger中。基本上就是在执行区块这个函数命令中,来回穿梭调用两个相关的依赖对象。
case object BALANCE extends OpCode[UInt256](0x31, 1, 1) with ConstGas[UInt256] {
protected def constGasFn(s: FeeSchedule) = s.G_balance
protected def getParams[W <: WorldState[W, S], S <: Storage[S]](state: ProgramState[W, S]) = {
val List(accountAddress) = state.stack.pop()
accountAddress
}
protected def exec[W <: WorldState[W, S], S <: Storage[S]](state: ProgramState[W, S], params: UInt256): ProgramState[W, S] = {
val accountAddress = params
val accountBalance = state.world.getBalance(Address(accountAddress))
state.stack.push(accountBalance)
state.withParallelRaceCondition(ProgramState.OnAccount).step()
}
}
合并在块状态中的代码:
private[ledger] def commit(): BlockWorldState = {
trieAccounts = trieAccounts.commit()
this
}
/**
* Should be called adter committed
*/
def persist(): BlockWorldState = {
// deduplicate codes first
this.codes.foldLeft(Map[Hash, ByteString]()) {
case (acc, (address, code)) => acc + (Hash(crypto.kec256(code)) -> code)
} foreach {
case (hash, code) => evmCodeStorage + (hash -> code)
}
this.trieStorages.foreach {
case (address, storageTrie) => storageTrie.underlying.persist()
}
this.trieAccounts.underlying.persist()
this
}
// --- merge ---
def mergeRaceConditions(later: BlockWorldState): BlockWorldState = {
later.raceConditions foreach {
case (k, vs) => this.raceConditions += (k -> (this.raceConditions.getOrElse(k, Set()) ++ vs))
}
this
}
private[ledger] def merge(later: BlockWorldState): Either[Map[RaceCondition, Set[Address]], BlockWorldState] = {
val raceCondiftions = this.raceConditions.foldLeft(Map[RaceCondition, Set[Address]]()) {
case (acc, (OnAccount, addresses)) => acc + (OnAccount -> addresses.filter(later.trieAccounts.logs.contains))
case (acc, (OnStorage, addresses)) => acc + (OnStorage -> addresses.filter(later.trieStorages.contains))
case (acc, (OnCode, addresses)) => acc + (OnCode -> addresses.filter(later.codes.contains))
case (acc, (OnAddress, addresses)) => acc + (OnAddress -> addresses.filter(x => later.codes.contains(x) || later.trieStorages.contains(x) || later.trieAccounts.logs.contains(x)))
} filter (_._2.nonEmpty)
if (raceCondiftions.isEmpty) {
val toMerge = this.copy
toMerge.touchedAddresses ++= later.touchedAddresses
//mergeAccountTrieAccount_simple(toMerge, that)
toMerge.mergeAccountTrieAccount(later).mergeTrieStorage(later).mergeCode(later).mergeRaceConditions(later)
Right(toMerge)
} else {
Left(raceCondiftions)
}
}
/** mergeAccountTrieAccount should work now, mergeAccountTrieAccount_simple is left here for reference only */
private def mergeAccountTrieAccount_simple(later: BlockWorldState): BlockWorldState = {
this.trieAccounts.logs ++= later.trieAccounts.logs
this
}
private def mergeAccountTrieAccount(later: BlockWorldState): BlockWorldState = {
val alreadyMergedAddresses = later.accountDeltas map {
case (address, deltas) =>
val valueMerged = deltas.foldLeft(this.getAccount(address).getOrElse(this.emptyAccount)) {
case (acc, AccountDelta(nonce, balance, _, _)) => acc.increaseNonce(nonce).increaseBalance(balance)
}
// just put the lasted stateRoot and codeHash of y and merge delete
later.trieAccounts.logs.get(address).map {
case Updated(Account(_, _, stateRoot, codeHash)) => this.trieAccounts += (address -> valueMerged.withStateRoot(stateRoot).withCodeHash(codeHash))
case Original(_) => this.trieAccounts += (address -> valueMerged)
case Deleted(_) => this.trieAccounts -= address
}
address
} toSet
this.trieAccounts.logs ++= (later.trieAccounts.logs -- alreadyMergedAddresses)
this
}
private def mergeTrieStorage(later: BlockWorldState): BlockWorldState = {
this.trieStorages ++= later.trieStorages
this
}
private def mergeCode(later: BlockWorldState): BlockWorldState = {
this.codes ++= later.codes
this
}
merge函数里首先根据竞态条件依次进行合并处理,然后再合并相关状态树等。这也和最初提出的三个方向基本保持一致。这个合并函数在并行执行结果处被调用
prevWorld.merge(txResult.world) match {
case Left(raceCondiftions) =>
log.debug(s"tx ${txResult.stx.hash} has race conditions with prev world state:\n$raceCondiftions")
reExecute(txResult.stx, prevWorld)
case Right(mergedWorld) =>
parallelCount += 1
currWorld = Some(mergedWorld)
txResults = txResults :+ txResult
}
涉及到一个问题,就是Reward的计算,处理它使用了类似的机制,分三步进行:
1、预计算
override def prepareBlock(
block: Block,
validators: Validators
)(implicit executor: ExecutionContext): Future[BlockPreparationResult] = {
val parentStateRoot = blockchain.getBlockHeaderByHash(block.header.parentHash).map(_.stateRoot)
val initialWorld = blockchain.getReadOnlyWorldState(None, blockchainConfig.accountStartNonce, parentStateRoot)
executePreparedTransactions(block.body.transactionList, initialWorld, block.header, validators.signedTransactionValidator) map {
case (execResult @ BlockResult(resultingWorldState, _, _, _), txExecuted) =>
val worldRewardPaid = payBlockReward(block)(resultingWorldState)
val worldPersisted = worldRewardPaid.commit().persist()
BlockPreparationResult(block.copy(body = block.body.copy(transactionList = txExecuted)), execResult, worldPersisted.stateRootHash)
}
}
2、执行中计算
override def executeBlock(block: Block, validators: Validators)(implicit executor: ExecutionContext): Future[Either[BlockExecutionError, BlockResult]] = {
val start1 = System.nanoTime
val parallelResult = executeBlockTransactions(block, validators.signedTransactionValidator, isParallel = true && !blockchainConfig.isDebugTraceEnabled) map {
case Right(blockResult) =>
log.debug(s"${block.header.number} parallel-executed in ${(System.nanoTime - start1) / 1000000}ms")
val start2 = System.nanoTime
val worldRewardPaid = payBlockReward(block)(blockResult.world)
}
......
}
3、并行结果后计算
parallelResult flatMap {
case Right((blockResult, worldCommitted)) => Future.successful(Right(blockResult))
case left @ Left(error) =>
log.debug(s"in parallel failed with error $error, try sequential ...")
val start1 = System.nanoTime
executeBlockTransactions(block, validators.signedTransactionValidator, isParallel = false) map {
case Right(blockResult) =>
log.debug(s"${block.header.number} sequential-executed in ${(System.nanoTime - start1) / 1000000}ms")
val worldRewardPaid = payBlockReward(block)(blockResult.world)
}
}
4、计算函数
private def payBlockReward(block: Block)(world: BlockWorldState): BlockWorldState = {
val minerAddress = Address(block.header.beneficiary)
val minerAccount = getAccountToPay(minerAddress)(world)
val minerReward = blockRewardCalculator.calcBlockMinerReward(block.header.number, block.body.uncleNodesList.size)
val afterMinerReward = world.saveAccount(minerAddress, minerAccount.increaseBalance(minerReward))
log.debug(s"Paying block ${block.header.number} reward of $minerReward to miner with account address $minerAddress")
block.body.uncleNodesList.foldLeft(afterMinerReward) { (ws, ommer) =>
val ommerAddress = Address(ommer.beneficiary)
val account = getAccountToPay(ommerAddress)(ws)
val ommerReward = blockRewardCalculator.calcOmmerMinerReward(block.header.number, ommer.number)
log.debug(s"Paying block ${block.header.number} reward of $ommerReward to ommer with account address $ommerAddress")
ws.saveAccount(ommerAddress, account.increaseBalance(ommerReward))
}
}
5、最终给付
private def postExecuteTransactions(
blockHeader: BlockHeader,
evmCfg: EvmConfig,
txResults: Vector[TxResult],
stats: Stats
)(world: BlockWorldState): Either[BlockExecutionError, BlockResult] = {
try {
val (accGas, accTxFee, accTouchedAddresses, accReceipts) = txResults.foldLeft(0L, UInt256.Zero, Set[Address](), Vector[Receipt]()) {
case ((accGas, accTxFee, accTouchedAddresses, accReceipts), TxResult(stx, worldAfterTx, gasUsed, txFee, logs, touchedAddresses, _, error, isRevert, _)) =>
val postTxState = if (evmCfg.eip658) {
if (error.isDefined || isRevert) Receipt.Failure else Receipt.Success
} else {
worldAfterTx.stateRootHash
//worldAfterTx.commit().stateRootHash // TODO here if get stateRootHash, should commit first, but then how about parallel running? how about sending a lazy evaulate function instead of value?
}
log.debug(s"Tx ${stx.hash} gasLimit: ${stx.tx.gasLimit}, gasUsed: $gasUsed, cumGasUsed: ${accGas + gasUsed}")
val receipt = Receipt(
postTxState = postTxState,
cumulativeGasUsed = accGas + gasUsed,
logsBloomFilter = BloomFilter.create(logs),
logs = logs
)
(accGas + gasUsed, accTxFee + txFee, accTouchedAddresses ++ touchedAddresses, accReceipts :+ receipt)
}
//计算并给付GAS
val minerAddress = Address(blockHeader.beneficiary)
val worldPayMinerForGas = world.pay(minerAddress, accTxFee)
// find empty touched accounts to be deleted
val deadAccounts = if (evmCfg.eip161) {
(accTouchedAddresses + minerAddress) filter (worldPayMinerForGas.isAccountDead)
} else {
Set[Address]()
}
//log.debug(s"touched accounts: ${result.addressesTouched}, miner: $minerAddress")
log.debug(s"dead accounts accounts: $deadAccounts")
val worldDeletedDeadAccounts = deleteAccounts(deadAccounts)(worldPayMinerForGas)
log.debug(s"$blockHeader, accGas $accGas, receipts = $accReceipts")
Right(BlockResult(worldDeletedDeadAccounts, accGas, accReceipts, stats))
} catch {
case MPTNodeMissingException(_, hash, table) => Left(MissingNodeExecptionError(blockHeader.number, hash, table))
case e: Throwable => throw e
}
}
最终更新到世界状态中做为下一次区块并行交易的起点状态。
通过上面的分析发现,khipu的并行思路不错,简单有效,而且基于目前网络上真正能够冲突的数据还是比较少的实际环境,提高的速度还是比较明显的。在下一篇,详细分析并行的过程。