Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: add priority queue: STEP 1 #12121

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c0cfa05
add priority queue
poopoothegorilla Feb 21, 2024
d54cd8d
Merge branch 'develop' into jtw/step-1-in-memory-work
poopoothegorilla Feb 29, 2024
a516d5e
move RWMutex to PriorityQueue
poopoothegorilla Feb 29, 2024
f6d3d0d
Merge branch 'develop' into jtw/step-1-in-memory-work
poopoothegorilla Mar 6, 2024
51d413b
unexport and move priority queue work
poopoothegorilla Mar 6, 2024
24aa17f
rename parameter from maxUnstarted to capacity
poopoothegorilla Mar 6, 2024
fd6d36b
apply initial recommendations
poopoothegorilla Mar 6, 2024
40f434a
implement all unit tests for TxPriorityQueue
poopoothegorilla Mar 6, 2024
c9865ab
fix test name
poopoothegorilla Mar 6, 2024
9a2f956
remove popped ID from idToIndex map
poopoothegorilla Mar 6, 2024
6e5913a
fix TxPriorityQueue
poopoothegorilla Mar 6, 2024
d57096c
fix potential race condition
poopoothegorilla Mar 7, 2024
e1756d0
panic if misconfigured capacity for priority heap
poopoothegorilla Mar 8, 2024
4306f3e
Merge branch 'develop' into jtw/step-1-in-memory-work
poopoothegorilla Mar 8, 2024
dd11aa8
update changeset
poopoothegorilla Mar 8, 2024
5b541ce
address some comments
poopoothegorilla Mar 11, 2024
960f6dc
add back line that sets element to nil
poopoothegorilla Mar 11, 2024
6f82130
minor nit
poopoothegorilla Mar 11, 2024
41a1b76
Merge branch 'develop' into jtw/step-1-in-memory-work
poopoothegorilla Mar 13, 2024
40f2b22
address comments
poopoothegorilla Mar 14, 2024
1d51ff0
move len to caller
poopoothegorilla Mar 21, 2024
130ec9a
Merge branch 'develop' into jtw/step-1-in-memory-work
poopoothegorilla Mar 21, 2024
ffb5ac8
Merge branch 'develop' into jtw/step-1-in-memory-work
poopoothegorilla Mar 22, 2024
108ad38
Merge branch 'develop' into jtw/step-1-in-memory-work
amit-momin Jun 26, 2024
7adbba8
Updated changeset with tag and fixed lint error
amit-momin Jun 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/seven-rice-end.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

adds an internal queue package which is part of refactor work
92 changes: 92 additions & 0 deletions common/internal/queues/priority_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package queues

import (
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

// priorityHeap is a priority queue of transactions prioritized by creation time. The oldest transaction is at the front of the queue.
// It implements the heap interface in the container/heap package.
type priorityHeap[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
txs []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
idToIndex map[int64]int
}

// newPriorityHeap returns a new priorityHeap instance
func NewPriorityHeap[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
](capacity int) *priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
if capacity <= 0 {
panic("priority_heap: capacity must be greater than 0")
}

pq := priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
txs: make([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], 0, capacity),
idToIndex: make(map[int64]int),
}

return &pq
}

// Close clears the queue
func (pq *priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() {
pq.txs = nil
pq.idToIndex = nil
}

// FindIndexByID returns the index of the transaction with the given ID
func (pq *priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindIndexByID(id int64) (int, bool) {
i, ok := pq.idToIndex[id]
return i, ok
}

// Peek returns the next transaction to be processed
func (pq *priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Peek() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
if len(pq.txs) == 0 {
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
return pq.txs[0]
}

// Cap returns the capacity of the queue
func (pq *priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Cap() int {
return cap(pq.txs)
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
}

// Len, Less, Swap, Push, and Pop methods implement the heap interface
func (pq *priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Len() int {
return len(pq.txs)
}
func (pq *priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Less(i, j int) bool {
// We want Pop to give us the oldest, not newest, transaction based on creation time
return pq.txs[i].CreatedAt.Before(pq.txs[j].CreatedAt)
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
}
func (pq *priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Swap(i, j int) {
pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i]
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
pq.idToIndex[pq.txs[i].ID] = i
pq.idToIndex[pq.txs[j].ID] = j
}
func (pq *priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Push(tx any) {
pq.txs = append(pq.txs, tx.(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]))
pq.idToIndex[tx.(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]).ID] = len(pq.txs) - 1
}
func (pq *priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pop() any {
old := pq.txs
n := len(old)
tx := old[n-1]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this going to panic when n = 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this actually was borrowed from https://pkg.go.dev/container/heap#example-package-PriorityQueue and the bounds checks are done via the caller in this case

old[n-1] = nil // avoid memory leak
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
pq.txs = old[0 : n-1]
delete(pq.idToIndex, tx.ID)
return tx
}
133 changes: 133 additions & 0 deletions common/internal/queues/tx_priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package queues

import (
"container/heap"
"sync"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

// TxPriorityQueue is a priority queue of transactions prioritized by creation time. The oldest transaction is at the front of the queue.
type TxPriorityQueue[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
sync.RWMutex
ph *priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
}

// NewTxPriorityQueue returns a new txPriorityQueue instance
func NewTxPriorityQueue[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
](capacity int) *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
return &TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
ph: NewPriorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](capacity),
}
}

// AddTx adds a transaction to the queue.
// If the queue is full, the oldest transaction is removed.
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTx(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
pq.Lock()

if pq.ph.Len() == pq.ph.Cap() {
heap.Pop(pq.ph)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restating my understanding:

priorityHeap is meant to store tx ordered by creation time, with the oldest tx at index 0, and the newest tx at index length - 1.

The current implementation of Pop on priorityHeap:

func (pq *priorityHeap[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pop() any {
	old := pq.txs
	n := len(old)
	tx := old[n-1]
	old[n-1] = nil // avoid memory leak
	pq.txs = old[0 : n-1]
	delete(pq.idToIndex, tx.ID)
	return tx
}

Here, IFIUC, we remove the tx at index length - 1 from both the array and the index map, which is the tx with the most recent creation time.

Then, in AddTx, we have

	if pq.ph.Len() == pq.ph.Cap() {
		heap.Pop(pq.ph)
        }

So, when the queue is full, doesn't the current code remove the newest transaction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is using the heap pkg... so that Pop is actually https://cs.opensource.google/go/go/+/refs/tags/go1.22.1:src/container/heap/heap.go;l=59

So it should take the item that is next on the Heap

}
heap.Push(pq.ph, tx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it'd make the code slightly more readable to give the internal heap a better name, like p_heap or something. Because it reflects internal state and is defined near the top of the file whereas pq is defined on the line above and is easy to check and see what it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm... i will think about it. usually i stay away from renaming the standard library pkgs


pq.Unlock()
}

// RemoveNextTx removes the next transaction to be processed from the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RemoveNextTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
pq.Lock()
defer pq.Unlock()

if pq.ph.Len() == 0 {
return nil
}

return heap.Pop(pq.ph).(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE])
}

// RemoveTxByID removes the transaction with the given ID from the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RemoveTxByID(id int64) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
pq.Lock()
defer pq.Unlock()

if pq.ph.Len() == 0 {
return nil
}

return pq._removeTxByID(id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: _ looks like python's notation. Small letter should be sufficient to signal that method is private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am using it to try to distinguish from unsafe methods

}

// PruneByTxIDs removes the transactions with the given IDs from the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneByTxIDs(ids []int64) []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
pq.Lock()
defer pq.Unlock()

if pq.ph.Len() == 0 {
return nil
}

removed := []txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}
for _, id := range ids {
if tx := pq._removeTxByID(id); tx != nil {
removed = append(removed, *tx)
}
}

return removed
}

// PeekNextTx returns the next transaction to be processed without removing it from the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
pq.RLock()
defer pq.RUnlock()

return pq.ph.Peek()
}

// Close clears the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() {
pq.Lock()
defer pq.Unlock()

pq.ph.Close()
}

// Cap returns the capacity of the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Cap() int {
pq.RLock()
defer pq.RUnlock()

return pq.ph.Cap()
}

// Len returns the length of the queue
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Len() int {
pq.RLock()
defer pq.RUnlock()

return pq.ph.Len()
}

// _removeTxByID removes the transaction with the given ID from the queue.
// This method assumes that the caller holds the lock.
func (pq *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) _removeTxByID(id int64) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
if i, ok := pq.ph.FindIndexByID(id); ok {
return heap.Remove(pq.ph, i).(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE])
}

return nil
}
Loading
Loading