Skip to content

Commit

Permalink
(unfinished) transactions in incoming messages
Browse files Browse the repository at this point in the history
Consensus-layer support for runtime transactions in roothash incoming
message.
  • Loading branch information
pro-wh committed Apr 22, 2022
1 parent 9f399d0 commit d3c4537
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 8 deletions.
21 changes: 15 additions & 6 deletions go/consensus/tendermint/apps/roothash/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/logging"
abciAPI "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
"github.com/oasisprotocol/oasis-core/go/roothash/api/message"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
"github.com/oasisprotocol/oasis-core/go/worker/common/committee"
)

// getRuntimeState fetches the current runtime state and performs common
Expand Down Expand Up @@ -379,17 +381,24 @@ func (app *rootHashApplication) submitMsg(

// Queue message.
inMsg := &message.IncomingMessage{
ID: meta.NextSequenceNumber,
Caller: ctx.CallerAddress(),
Tag: msg.Tag,
Fee: msg.Fee,
Tokens: msg.Tokens,
Data: msg.Data,
ID: meta.NextSequenceNumber,
Caller: ctx.CallerAddress(),
Tag: msg.Tag,
Fee: msg.Fee,
Tokens: msg.Tokens,
Transaction: msg.Transaction,
Data: msg.Data,
}
if err = state.SetIncomingMessageInQueue(ctx, rtState.Runtime.ID, inMsg); err != nil {
return err
}

if msg.Transaction != nil {
// todo: access txpool somehow
var node *committee.Node
node.TxPool.AddIncomingTx(hash.NewFromBytes(*msg.Transaction))
}

// Update next sequence number.
meta.Size++
meta.NextSequenceNumber++
Expand Down
3 changes: 3 additions & 0 deletions go/roothash/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ type SubmitMsg struct {
// Tokens are any tokens sent into the runtime as part of the message being sent. The tokens are
// transferred before the message is processed by the runtime.
Tokens quantity.Quantity `json:"tokens,omitempty"`
// Transaction is an optional transaction. `nil` means no transaction, while `tx = nil; &tx` is
// a transaction that is the empty byte string. Go, I swear.
Transaction *[]byte `json:"transaction,omitempty"`
// Data is arbitrary runtime-dependent data.
Data []byte `json:"data,omitempty"`
}
Expand Down
4 changes: 4 additions & 0 deletions go/roothash/api/message/incoming_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type IncomingMessage struct {
// transferred before the message is processed by the runtime.
Tokens quantity.Quantity `json:"tokens,omitempty"`

// Transaction is an optional transaction. `nil` means no transaction, while `tx = nil; &tx` is
// a transaction that is the empty byte string. Go, I swear.
Transaction *[]byte `json:"transaction,omitempty"`

// Data is arbitrary runtime-dependent data.
Data []byte `json:"data,omitempty"`
}
Expand Down
2 changes: 2 additions & 0 deletions go/runtime/host/protocol/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ type RuntimeExecuteTxBatchResponse struct {
// TxRejectHashes are the transaction hashes of transactions that should be immediately removed
// from the scheduling queue as they are invalid.
TxRejectHashes []hash.Hash `json:"tx_reject_hashes,omitempty"`
// TxIncomingHashes are the transactions that the runtime is scheduling from outside the queue.
TxIncomingHashes []hash.Hash `json:"extra_txs,omitempty"`
// TxInputRoot is the root hash of all transaction inputs.
TxInputRoot hash.Hash `json:"tx_input_root,omitempty"`
// TxInputWriteLog is the write log for generating transaction inputs.
Expand Down
22 changes: 20 additions & 2 deletions go/runtime/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type TransactionPool interface {
// SubmitTxNoWait adds the transaction into the transaction pool and returns immediately.
SubmitTxNoWait(ctx context.Context, tx []byte, meta *TransactionMeta) error

// AddIncomingTx uh
AddIncomingTx(tx hash.Hash)

// SubmitProposedBatch adds the given (possibly new) transaction batch into the current
// proposal queue.
SubmitProposedBatch(batch [][]byte)
Expand All @@ -87,6 +90,9 @@ type TransactionPool interface {
// RemoveTxBatch removes a transaction batch from the transaction pool.
RemoveTxBatch(txs []hash.Hash)

// AdvanceIncomingTxs uh
AdvanceIncomingTxs(count int)

// GetPrioritizedBatch returns a batch of transactions ordered by priority.
//
// Offset specifies the transaction hash that should serve as an offset when returning
Expand Down Expand Up @@ -181,8 +187,9 @@ type txPool struct {
schedulerTicker *time.Ticker
schedulerNotifier *pubsub.Broker

proposedTxsLock sync.Mutex
proposedTxs map[hash.Hash]*Transaction
proposedTxsLock sync.Mutex
proposedTxs map[hash.Hash]*Transaction
incomingMessageTxs []hash.Hash

blockInfoLock sync.Mutex
blockInfo *BlockInfo
Expand Down Expand Up @@ -276,6 +283,11 @@ func (t *txPool) addToCheckQueue(pct *PendingCheckTransaction) error {
return nil
}

func (t *txPool) AddIncomingTx(tx hash.Hash) {
// todo: locking?
t.incomingMessageTxs = append(t.incomingMessageTxs, tx)
}

func (t *txPool) SubmitProposedBatch(batch [][]byte) {
// Also ingest into the regular pool (may fail).
for _, rawTx := range batch {
Expand Down Expand Up @@ -323,6 +335,12 @@ func (t *txPool) RemoveTxBatch(txs []hash.Hash) {
pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.schedulerQueue.size()))
}

func (t *txPool) AdvanceIncomingTxs(count int) {
// todo: locking?
// todo: will later append really free anything? would deque be better?
t.incomingMessageTxs = t.incomingMessageTxs[count:]
}

func (t *txPool) GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*Transaction {
return t.schedulerQueue.getPrioritizedBatch(offset, limit)
}
Expand Down

0 comments on commit d3c4537

Please sign in to comment.