From ec73c49408a0e4e376010e088191def9d0b8ea3e Mon Sep 17 00:00:00 2001 From: Warren He Date: Thu, 19 May 2022 17:05:51 -0700 Subject: [PATCH] (unfinished) queue concepts --- go/runtime/txpool/local_queue.go | 80 ++++++++++++++++++++++++++++++++ go/runtime/txpool/main_queue.go | 60 ++++++++++++++++++++++++ go/runtime/txpool/queues.go | 36 ++++++++++++++ go/runtime/txpool/rim_queue.go | 25 ++++++++++ 4 files changed, 201 insertions(+) create mode 100644 go/runtime/txpool/local_queue.go create mode 100644 go/runtime/txpool/main_queue.go create mode 100644 go/runtime/txpool/queues.go create mode 100644 go/runtime/txpool/rim_queue.go diff --git a/go/runtime/txpool/local_queue.go b/go/runtime/txpool/local_queue.go new file mode 100644 index 00000000000..cd4ce3e3024 --- /dev/null +++ b/go/runtime/txpool/local_queue.go @@ -0,0 +1,80 @@ +package txpool + +import ( + "time" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" +) + +var ( + _ UsableTransactionSource = (*localQueue)(nil) + _ RecheckableTransactionStore = (*localQueue)(nil) + _ RepublishableTransactionSource = (*localQueue)(nil) +) + +// localQueue is a "front of the line" area for txs from our own node. We also keep these txs in order. +type localQueue struct { + txs [][]byte + indexesByHash map[hash.Hash]int +} + +func (lq *localQueue) GetSchedulingSuggestion() [][]byte { + return lq.txs[:] +} + +func (lq *localQueue) GetTxByHash(h hash.Hash) ([]byte, bool) { + i, ok := lq.indexesByHash[h] + if !ok { + return nil, false + } + return lq.txs[i], true +} + +func (lq *localQueue) HandleTxsUsed(hashes []hash.Hash) { + removeAny := false + for _, h := range hashes { + if _, ok := lq.indexesByHash[h]; ok { + removeAny = true + delete(lq.indexesByHash, h) + } + } + if removeAny { + return + } + keptHashes := make([]*hash.Hash, len(lq.txs)) + for h, i := range lq.indexesByHash { + keptHashes[i] = &h + } + var remainingTxs [][]byte + for i, hp := range keptHashes { + if hp == nil { + continue + } + j := len(remainingTxs) + remainingTxs = append(remainingTxs, lq.txs[i]) + lq.indexesByHash[*hp] = j + } + lq.txs = remainingTxs +} + +func (lq *localQueue) TakeAll() [][]byte { + txs := lq.txs + lq.txs = nil + lq.indexesByHash = make(map[hash.Hash]int) + return txs +} + +func (lq *localQueue) OfferChecked(txs [][]byte) { + j := len(lq.txs) + for _, tx := range txs { + h := hash.NewFromBytes(tx) + lq.indexesByHash[h] = j + j++ + } + lq.txs = append(lq.txs, txs...) +} + +func (lq *localQueue) GetTxsToPublish(now time.Time) ([][]byte, time.Time) { + // todo: reexamine republish mechanism + return nil, now.Add(60 * time.Second) +} diff --git a/go/runtime/txpool/main_queue.go b/go/runtime/txpool/main_queue.go new file mode 100644 index 00000000000..08aedcc36fc --- /dev/null +++ b/go/runtime/txpool/main_queue.go @@ -0,0 +1,60 @@ +package txpool + +import ( + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/common/logging" +) + +var ( + _ UsableTransactionSource = (*mainQueue)(nil) + _ RecheckableTransactionStore = (*mainQueue)(nil) +) + +// mainQueue is a priority queue for transactions that we give no special treatment. +type mainQueue struct { + // This implementation adapts the existing scheduleQueue code. + inner scheduleQueue +} + +func (mq *mainQueue) GetSchedulingSuggestion() [][]byte { + txMetas := mq.inner.getPrioritizedBatch(nil, 50) + var txs [][]byte + for _, txMeta := range txMetas { + txs = append(txs, txMeta.tx) + } + return txs +} + +func (mq *mainQueue) GetTxByHash(h hash.Hash) ([]byte, bool) { + txMetas, _ := mq.inner.getKnownBatch([]hash.Hash{h}) + if txMetas[0] == nil { + return nil, false + } + return txMetas[0].tx, true +} + +func (mq *mainQueue) HandleTxsUsed(hashes []hash.Hash) { + mq.inner.remove(hashes) +} + +func (mq *mainQueue) TakeAll() [][]byte { + txMetas := mq.inner.getAll() + mq.inner.clear() + var txs [][]byte + for _, txMeta := range txMetas { + txs = append(txs, txMeta.tx) + } + return txs +} + +func (mq *mainQueue) OfferChecked(txs [][]byte) { + for _, tx := range txs { + txMeta := newTransaction(tx, txStatusChecked) + if err := mq.inner.add(txMeta); err != nil { + logging.GetLogger("mainQueue").Warn("offerChecked tx not wanted", + "hash", txMeta.hash, + "err", err, + ) + } + } +} diff --git a/go/runtime/txpool/queues.go b/go/runtime/txpool/queues.go new file mode 100644 index 00000000000..654bcb55013 --- /dev/null +++ b/go/runtime/txpool/queues.go @@ -0,0 +1,36 @@ +package txpool + +import ( + "time" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" +) + +// UsableTransactionSource is a place to retrieve txs that are "good enough." "Good enough" variously means CheckTx'd, +// came from roothash incoming message, or came from our own node. +type UsableTransactionSource interface { + // GetSchedulingSuggestion returns some number of txs to give to the scheduler as part of the initial + // batch. + GetSchedulingSuggestion() [][]byte + // GetTxByHash returns the specific tx, if it is in this queue. The bool is like `value, ok := txMap[key]`. Used + // for resolving a batch from hashes and serving txSync. + GetTxByHash(h hash.Hash) ([]byte, bool) + // HandleTxsUsed is a callback to indicate that the scheduler is done with a set of txs, by hash. For most + // implementations, remove it from internal storage. + HandleTxsUsed(hashes []hash.Hash) +} + +// RecheckableTransactionStore provides methods for rechecking. +type RecheckableTransactionStore interface { + // TakeAll removes all txs and returns them. + TakeAll() [][]byte + // OfferChecked adds txs that are checked. + OfferChecked(txs [][]byte) +} + +// RepublishableTransactionSource is a place to get txs that we want to push. +type RepublishableTransactionSource interface { + // GetTxsToPublish gets txs that this queue wants to publish between last call and now given, as well as next time + // that it wants to publish any txs. + GetTxsToPublish(now time.Time) ([][]byte, time.Time) +} diff --git a/go/runtime/txpool/rim_queue.go b/go/runtime/txpool/rim_queue.go new file mode 100644 index 00000000000..8f8de80be4c --- /dev/null +++ b/go/runtime/txpool/rim_queue.go @@ -0,0 +1,25 @@ +package txpool + +import "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + +var ( + _ UsableTransactionSource = (*rimQueue)(nil) +) + +// rimQueue exposes transactions form roothash incoming messages. +type rimQueue struct{} + +func (rq *rimQueue) GetSchedulingSuggestion() [][]byte { + // Runtimes instead get transactions from the incoming messages. + return nil +} + +func (rq *rimQueue) GetTxByHash(h hash.Hash) ([]byte, bool) { + // TODO implement me + panic("implement me") + // get incoming messages, parse them, extract txs, hash them, look up by hash here +} + +func (rq *rimQueue) HandleTxsUsed(hashes []hash.Hash) { + // The roothash module manages the incoming message queue on its own, so we don't do anything here. +}