Skip to content

Commit

Permalink
(unfinished) transactions in incoming messages
Browse files Browse the repository at this point in the history
Consensus-layer support for runtime transactions in roothash incoming
message.
  • Loading branch information
pro-wh committed Apr 26, 2022
1 parent 7084b4e commit 6956f2d
Showing 1 changed file with 44 additions and 2 deletions.
46 changes: 44 additions & 2 deletions go/worker/compute/executor/committee/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import (
"github.com/cenkalti/backoff/v4"

cmnBackoff "github.com/oasisprotocol/oasis-core/go/common/backoff"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
consensus "github.com/oasisprotocol/oasis-core/go/consensus/api"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/message"
"github.com/oasisprotocol/oasis-core/go/runtime/transaction"
"github.com/oasisprotocol/oasis-core/go/runtime/txpool"
"github.com/oasisprotocol/oasis-core/go/worker/common/p2p/txsync"
Expand Down Expand Up @@ -39,7 +43,7 @@ func (n *Node) resolveBatchLocked(batch *unresolvedBatch, missingState NodeState
}
var ctx context.Context
ctx, n.missingTxsCancel = context.WithCancel(n.roundCtx)
go n.requestMissingTransactions(ctx)
go n.requestMissingTransactions(ctx, n.commonNode.CurrentConsensusBlock)
}
return resolvedBatch, nil
}
Expand Down Expand Up @@ -81,7 +85,45 @@ func (n *Node) handleNewCheckedTransactions(txs []*txpool.PendingCheckTransactio
}
}

func (n *Node) requestMissingTransactions(ctx context.Context) {
func (n *Node) requestMissingTransactions(ctx context.Context, consensusBlk *consensus.LightBlock) {
// Load transactions from roothash incoming messages.
func() {
inMsgs, err := n.commonNode.Consensus.RootHash().GetIncomingMessageQueue(ctx, &roothash.InMessageQueueRequest{
RuntimeID: n.commonNode.Runtime.ID(),
Height: consensusBlk.Height,
})
if err != nil {
n.logger.Error("failed to fetch incoming runtime message queue transactions",
"err", err,
)
// todo: propagate sanely
panic(err)
}
var inMsgTxs [][]byte
for _, msg := range inMsgs {
var data message.IncomingMessageData
if err = cbor.Unmarshal(msg.Data, &data); err != nil {
n.logger.Warn("incoming message data unmarshal failed",
"id", msg.ID,
"err", err,
)
continue
}
if err = data.ValidateBasic(); err != nil {
n.logger.Warn("incoming message data validate failed",
"id", msg.ID,
"err", err,
)
}
if data.Transaction != nil {
inMsgTxs = append(inMsgTxs, *data.Transaction)
}
}
if len(inMsgTxs) != 0 {
n.commonNode.TxPool.SubmitProposedBatch(inMsgTxs)
}
}()

requestOp := func() error {
// Determine what transactions are missing.
txHashes := func() []hash.Hash {
Expand Down

0 comments on commit 6956f2d

Please sign in to comment.