Skip to content

Commit

Permalink
(unfinished) queue concepts
Browse files Browse the repository at this point in the history
  • Loading branch information
pro-wh committed May 20, 2022
1 parent 52b0251 commit ec73c49
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 0 deletions.
80 changes: 80 additions & 0 deletions go/runtime/txpool/local_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package txpool

import (
"time"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
)

var (
_ UsableTransactionSource = (*localQueue)(nil)
_ RecheckableTransactionStore = (*localQueue)(nil)
_ RepublishableTransactionSource = (*localQueue)(nil)
)

// localQueue is a "front of the line" area for txs from our own node. We also keep these txs in order.
type localQueue struct {
txs [][]byte
indexesByHash map[hash.Hash]int
}

func (lq *localQueue) GetSchedulingSuggestion() [][]byte {
return lq.txs[:]
}

func (lq *localQueue) GetTxByHash(h hash.Hash) ([]byte, bool) {
i, ok := lq.indexesByHash[h]
if !ok {
return nil, false
}
return lq.txs[i], true
}

func (lq *localQueue) HandleTxsUsed(hashes []hash.Hash) {
removeAny := false
for _, h := range hashes {
if _, ok := lq.indexesByHash[h]; ok {
removeAny = true
delete(lq.indexesByHash, h)
}
}
if removeAny {
return
}
keptHashes := make([]*hash.Hash, len(lq.txs))
for h, i := range lq.indexesByHash {
keptHashes[i] = &h
}
var remainingTxs [][]byte
for i, hp := range keptHashes {
if hp == nil {
continue
}
j := len(remainingTxs)
remainingTxs = append(remainingTxs, lq.txs[i])
lq.indexesByHash[*hp] = j
}
lq.txs = remainingTxs
}

func (lq *localQueue) TakeAll() [][]byte {
txs := lq.txs
lq.txs = nil
lq.indexesByHash = make(map[hash.Hash]int)
return txs
}

func (lq *localQueue) OfferChecked(txs [][]byte) {
j := len(lq.txs)
for _, tx := range txs {
h := hash.NewFromBytes(tx)
lq.indexesByHash[h] = j
j++
}
lq.txs = append(lq.txs, txs...)
}

func (lq *localQueue) GetTxsToPublish(now time.Time) ([][]byte, time.Time) {
// todo: reexamine republish mechanism
return nil, now.Add(60 * time.Second)
}
60 changes: 60 additions & 0 deletions go/runtime/txpool/main_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package txpool

import (
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/logging"
)

var (
_ UsableTransactionSource = (*mainQueue)(nil)
_ RecheckableTransactionStore = (*mainQueue)(nil)
)

// mainQueue is a priority queue for transactions that we give no special treatment.
type mainQueue struct {
// This implementation adapts the existing scheduleQueue code.
inner scheduleQueue
}

func (mq *mainQueue) GetSchedulingSuggestion() [][]byte {
txMetas := mq.inner.getPrioritizedBatch(nil, 50)
var txs [][]byte
for _, txMeta := range txMetas {
txs = append(txs, txMeta.tx)
}
return txs
}

func (mq *mainQueue) GetTxByHash(h hash.Hash) ([]byte, bool) {
txMetas, _ := mq.inner.getKnownBatch([]hash.Hash{h})
if txMetas[0] == nil {
return nil, false
}
return txMetas[0].tx, true
}

func (mq *mainQueue) HandleTxsUsed(hashes []hash.Hash) {
mq.inner.remove(hashes)
}

func (mq *mainQueue) TakeAll() [][]byte {
txMetas := mq.inner.getAll()
mq.inner.clear()
var txs [][]byte
for _, txMeta := range txMetas {
txs = append(txs, txMeta.tx)
}
return txs
}

func (mq *mainQueue) OfferChecked(txs [][]byte) {
for _, tx := range txs {
txMeta := newTransaction(tx, txStatusChecked)
if err := mq.inner.add(txMeta); err != nil {
logging.GetLogger("mainQueue").Warn("offerChecked tx not wanted",
"hash", txMeta.hash,
"err", err,
)
}
}
}
36 changes: 36 additions & 0 deletions go/runtime/txpool/queues.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package txpool

import (
"time"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
)

// UsableTransactionSource is a place to retrieve txs that are "good enough." "Good enough" variously means CheckTx'd,
// came from roothash incoming message, or came from our own node.
type UsableTransactionSource interface {
// GetSchedulingSuggestion returns some number of txs to give to the scheduler as part of the initial
// batch.
GetSchedulingSuggestion() [][]byte
// GetTxByHash returns the specific tx, if it is in this queue. The bool is like `value, ok := txMap[key]`. Used
// for resolving a batch from hashes and serving txSync.
GetTxByHash(h hash.Hash) ([]byte, bool)
// HandleTxsUsed is a callback to indicate that the scheduler is done with a set of txs, by hash. For most
// implementations, remove it from internal storage.
HandleTxsUsed(hashes []hash.Hash)
}

// RecheckableTransactionStore provides methods for rechecking.
type RecheckableTransactionStore interface {
// TakeAll removes all txs and returns them.
TakeAll() [][]byte
// OfferChecked adds txs that are checked.
OfferChecked(txs [][]byte)
}

// RepublishableTransactionSource is a place to get txs that we want to push.
type RepublishableTransactionSource interface {
// GetTxsToPublish gets txs that this queue wants to publish between last call and now given, as well as next time
// that it wants to publish any txs.
GetTxsToPublish(now time.Time) ([][]byte, time.Time)
}
25 changes: 25 additions & 0 deletions go/runtime/txpool/rim_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package txpool

import "github.com/oasisprotocol/oasis-core/go/common/crypto/hash"

var (
_ UsableTransactionSource = (*rimQueue)(nil)
)

// rimQueue exposes transactions form roothash incoming messages.
type rimQueue struct{}

func (rq *rimQueue) GetSchedulingSuggestion() [][]byte {
// Runtimes instead get transactions from the incoming messages.
return nil
}

func (rq *rimQueue) GetTxByHash(h hash.Hash) ([]byte, bool) {
// TODO implement me
panic("implement me")
// get incoming messages, parse them, extract txs, hash them, look up by hash here
}

func (rq *rimQueue) HandleTxsUsed(hashes []hash.Hash) {
// The roothash module manages the incoming message queue on its own, so we don't do anything here.
}

0 comments on commit ec73c49

Please sign in to comment.