Skip to content

Commit

Permalink
Merge branch 'save-chunk' into 'master'
Browse files Browse the repository at this point in the history
save blocks by chunk

See merge request open-platform/chain!320
  • Loading branch information
george-bisiarin committed Jan 21, 2019
2 parents 47311eb + b722ee3 commit d76763e
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 55 deletions.
11 changes: 11 additions & 0 deletions src/main/kotlin/io/openfuture/chain/core/service/Services.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import io.openfuture.chain.core.model.entity.transaction.unconfirmed.Unconfirmed
import io.openfuture.chain.core.model.entity.transaction.unconfirmed.UnconfirmedTransferTransaction
import io.openfuture.chain.core.model.entity.transaction.unconfirmed.UnconfirmedVoteTransaction
import io.openfuture.chain.core.model.node.*
import io.openfuture.chain.core.sync.SyncMode
import io.openfuture.chain.network.message.consensus.PendingBlockMessage
import io.openfuture.chain.network.message.core.*
import io.openfuture.chain.network.message.sync.GenesisBlockMessage
Expand Down Expand Up @@ -49,6 +50,8 @@ interface BlockService {

fun save(block: Block)

fun saveChunk(blocksChunk: List<Block>, syncMode: SyncMode)

fun getAfterCurrentHash(hash: String): List<Block>

fun isExists(hash: String): Boolean
Expand Down Expand Up @@ -136,6 +139,8 @@ interface TransferTransactionService {

fun add(request: TransferTransactionRequest): UnconfirmedTransferTransaction

fun toBlock(transaction: TransferTransaction, block: MainBlock): TransferTransaction

fun toBlock(message: TransferTransactionMessage, block: MainBlock): TransferTransaction

fun verify(message: TransferTransactionMessage): Boolean
Expand All @@ -150,6 +155,8 @@ interface RewardTransactionService {

fun create(timestamp: Long, fees: Long): RewardTransactionMessage

fun toBlock(transaction: RewardTransaction, block: MainBlock)

fun toBlock(message: RewardTransactionMessage, block: MainBlock)

fun verify(message: RewardTransactionMessage): Boolean
Expand All @@ -176,6 +183,8 @@ interface VoteTransactionService {

fun add(request: VoteTransactionRequest): UnconfirmedVoteTransaction

fun toBlock(transaction: VoteTransaction, block: MainBlock): VoteTransaction

fun toBlock(message: VoteTransactionMessage, block: MainBlock): VoteTransaction

fun verify(message: VoteTransactionMessage): Boolean
Expand All @@ -196,6 +205,8 @@ interface DelegateTransactionService {

fun add(request: DelegateTransactionRequest): UnconfirmedDelegateTransaction

fun toBlock(transaction: DelegateTransaction, block: MainBlock): DelegateTransaction

fun toBlock(message: DelegateTransactionMessage, block: MainBlock): DelegateTransaction

fun verify(message: DelegateTransactionMessage): Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,26 @@ package io.openfuture.chain.core.service.block

import io.openfuture.chain.core.exception.NotFoundException
import io.openfuture.chain.core.model.entity.block.Block
import io.openfuture.chain.core.model.entity.block.GenesisBlock
import io.openfuture.chain.core.model.entity.block.MainBlock
import io.openfuture.chain.core.model.entity.transaction.confirmed.DelegateTransaction
import io.openfuture.chain.core.model.entity.transaction.confirmed.Transaction
import io.openfuture.chain.core.model.entity.transaction.confirmed.TransferTransaction
import io.openfuture.chain.core.model.entity.transaction.confirmed.VoteTransaction
import io.openfuture.chain.core.repository.BlockRepository
import io.openfuture.chain.core.service.BlockService
import io.openfuture.chain.core.service.*
import io.openfuture.chain.core.sync.SyncMode
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional

@Service
class DefaultBlockService(
private val repository: BlockRepository<Block>
private val repository: BlockRepository<Block>,
private val delegateService: DelegateService,
private val voteTransactionService: VoteTransactionService,
private val rewardTransactionService: RewardTransactionService,
private val delegateTransactionService: DelegateTransactionService,
private val transferTransactionService: TransferTransactionService
) : BlockService {

@Transactional(readOnly = true)
Expand Down Expand Up @@ -46,4 +58,58 @@ class DefaultBlockService(
@Transactional(readOnly = true)
override fun getCurrentHeight(): Long = repository.getCurrentHeight()

@Transactional
override fun saveChunk(blocksChunk: List<Block>, syncMode: SyncMode) {
blocksChunk.forEach { block ->
if (block is MainBlock) {
val rewardTransaction = block.payload.rewardTransaction.first()
block.payload.rewardTransaction = mutableListOf()

val transactions = mutableListOf<Transaction>()

if (syncMode == SyncMode.FULL) {
transactions.addAll(block.payload.transferTransactions)
transactions.addAll(block.payload.voteTransactions)
transactions.addAll(block.payload.delegateTransactions)

block.payload.transferTransactions = mutableListOf()
block.payload.voteTransactions = mutableListOf()
block.payload.delegateTransactions = mutableListOf()
}

this.save(block)
rewardTransaction.block = block
rewardTransactionService.toBlock(rewardTransaction, block)

if (syncMode == SyncMode.FULL) {
transactions.forEach {
if (it is TransferTransaction) {
it.block = block
transferTransactionService.toBlock(it, block)
}
if (it is DelegateTransaction) {
it.block = block
delegateTransactionService.toBlock(it, block)
}
if (it is VoteTransaction) {
it.block = block
voteTransactionService.toBlock(it, block)
}
}
}
} else if (block is GenesisBlock) {
val delegates = block.payload.activeDelegates.toMutableList()
block.payload.activeDelegates.clear()
delegates.forEach { delegate ->
if (delegateService.isExistsByPublicKey(delegate.publicKey)) {
block.payload.activeDelegates.add(delegateService.getByPublicKey(delegate.publicKey))
} else {
block.payload.activeDelegates.add(delegateService.save(delegate))
}
}
this.save(block)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class DefaultDelegateTransactionService(
}
}

@Transactional
override fun toBlock(transaction: DelegateTransaction, block: MainBlock): DelegateTransaction {
return toBlock(transaction.toMessage(), block)
}

@Transactional
override fun toBlock(message: DelegateTransactionMessage, block: MainBlock): DelegateTransaction {
BlockchainLock.writeLock.lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class DefaultRewardTransactionService(
return RewardTransactionMessage(timestamp, fee, senderAddress, hash, signature, publicKey, reward, delegate.address)
}

@Transactional
override fun toBlock(transaction: RewardTransaction, block: MainBlock) {
return toBlock(transaction.toMessage(), block)
}

@Transactional
override fun toBlock(message: RewardTransactionMessage, block: MainBlock) {
BlockchainLock.writeLock.lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ class DefaultTransferTransactionService(
}
}

@Transactional
override fun toBlock(transaction: TransferTransaction, block: MainBlock): TransferTransaction {
return toBlock(transaction.toMessage(), block)
}

@Transactional
override fun toBlock(message: TransferTransactionMessage, block: MainBlock): TransferTransaction {
BlockchainLock.writeLock.lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ internal class DefaultVoteTransactionService(
}
}

@Transactional
override fun toBlock(transaction: VoteTransaction, block: MainBlock): VoteTransaction {
return toBlock(transaction.toMessage(), block)
}

@Transactional
override fun toBlock(message: VoteTransactionMessage, block: MainBlock): VoteTransaction {
BlockchainLock.writeLock.lock()
Expand Down
61 changes: 8 additions & 53 deletions src/main/kotlin/io/openfuture/chain/core/sync/ChainSynchronizer.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package io.openfuture.chain.core.sync

import io.openfuture.chain.consensus.property.ConsensusProperties
import io.openfuture.chain.core.model.entity.Delegate
import io.openfuture.chain.core.model.entity.block.Block
import io.openfuture.chain.core.model.entity.block.GenesisBlock
import io.openfuture.chain.core.model.entity.block.MainBlock
import io.openfuture.chain.core.model.entity.block.payload.MainBlockPayload
import io.openfuture.chain.core.model.entity.transaction.confirmed.*
import io.openfuture.chain.core.model.entity.transaction.confirmed.DelegateTransaction
import io.openfuture.chain.core.model.entity.transaction.confirmed.RewardTransaction
import io.openfuture.chain.core.model.entity.transaction.confirmed.TransferTransaction
import io.openfuture.chain.core.model.entity.transaction.confirmed.VoteTransaction
import io.openfuture.chain.core.service.*
import io.openfuture.chain.core.sync.SyncMode.FULL
import io.openfuture.chain.core.sync.SyncStatus.*
Expand All @@ -21,7 +25,6 @@ import io.openfuture.chain.network.service.NetworkApiService
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
Expand All @@ -32,7 +35,6 @@ import javax.xml.bind.ValidationException
class ChainSynchronizer(
private val properties: NodeProperties,
private val blockService: BlockService,
private val delegateService: DelegateService,
private val networkApiService: NetworkApiService,
private val genesisBlockService: GenesisBlockService,
private val voteTransactionService: VoteTransactionService,
Expand Down Expand Up @@ -86,7 +88,6 @@ class ChainSynchronizer(
}
}

@Transactional
fun onEpochResponse(message: EpochResponseMessage) {
resetRequestScheduler()
val nodesInfo = genesisBlockService.getLast().payload.activeDelegates.map { getNodeInfo(it) }.toList()
Expand Down Expand Up @@ -221,55 +222,9 @@ class ChainSynchronizer(
val lastLocalBlock = blockService.getLast()
val filteredStorage = syncSession!!.getStorage().filter { it.height > lastLocalBlock.height }

filteredStorage.asReversed().forEach { block ->
if (block is MainBlock) {
val rewardTransaction = block.payload.rewardTransaction.first()
block.payload.rewardTransaction = mutableListOf()

val transactions = mutableListOf<Transaction>()

if (syncSession!!.syncMode == SyncMode.FULL) {
transactions.addAll(block.payload.transferTransactions)
transactions.addAll(block.payload.voteTransactions)
transactions.addAll(block.payload.delegateTransactions)

block.payload.transferTransactions = mutableListOf()
block.payload.voteTransactions = mutableListOf()
block.payload.delegateTransactions = mutableListOf()
}

blockService.save(block)
rewardTransaction.block = block
rewardTransactionService.toBlock(rewardTransaction.toMessage(), block)

if (syncSession!!.syncMode == SyncMode.FULL) {
transactions.forEach {
if (it is TransferTransaction) {
it.block = block
transferTransactionService.toBlock(it.toMessage(), block)
}
if (it is DelegateTransaction) {
it.block = block
delegateTransactionService.toBlock(it.toMessage(), block)
}
if (it is VoteTransaction) {
it.block = block
voteTransactionService.toBlock(it.toMessage(), block)
}
}
}
} else if (block is GenesisBlock) {
val delegates = block.payload.activeDelegates.toMutableList()
block.payload.activeDelegates.clear()
delegates.forEach { delegate ->
if (delegateService.isExistsByPublicKey(delegate.publicKey)) {
block.payload.activeDelegates.add(delegateService.getByPublicKey(delegate.publicKey))
} else {
block.payload.activeDelegates.add(delegateService.save(delegate))
}
}
blockService.save(block)
}
filteredStorage.asReversed().chunked(properties.syncBatchSize!!).forEach {
blockService.saveChunk(it, syncSession!!.syncMode)
log.debug("Blocks saved from ${it.first().height} to ${it.last().height}")
}

syncSession = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class NodeProperties(
@field:NotNull
var peerPenalty: Long? = null,

@field:NotNull
var syncBatchSize: Int? = null,

@field:NotEmpty
var ntpServers: List<String> = emptyList(),

Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ node.config-path=config.json
node.peer-penalty=3600000
node.expiry=10000
node.sync-expiry=10000
node.sync-batch-size=22
# Time value for choosing next ntp server, which equals 24 hours
node.next-ntp-server-interval=86400000
node.ntp-offset-threshold=100
Expand Down

0 comments on commit d76763e

Please sign in to comment.