Skip to content

Commit

Permalink
Merge pull request #947 from iotaledger/fix/scheduler-size
Browse files Browse the repository at this point in the history
Fix scheduler sizes and shutdown
  • Loading branch information
muXxer authored Apr 30, 2024
2 parents 04a114d + 4a847d5 commit 842f535
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 197 deletions.
79 changes: 31 additions & 48 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package drr

import (
"container/ring"
"fmt"
"math"
"time"

Expand All @@ -20,8 +21,9 @@ import (
type BasicBuffer struct {
activeIssuers *shrinkingmap.ShrinkingMap[iotago.AccountID, *ring.Ring]
ring *ring.Ring
// size is the number of blocks in the buffer.
size atomic.Int64

readyBlocksCount atomic.Int64
totalBlocksCount atomic.Int64

tokenBucket float64
lastScheduleTime time.Time
Expand Down Expand Up @@ -57,11 +59,6 @@ func (b *BasicBuffer) Clear() {
})
}

// Size returns the total number of blocks in BasicBuffer.
func (b *BasicBuffer) Size() int {
return int(b.size.Load())
}

// IssuerQueue returns the queue for the corresponding issuer.
func (b *BasicBuffer) IssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
element, exists := b.activeIssuers.Get(issuerID)
Expand Down Expand Up @@ -97,8 +94,25 @@ func (b *BasicBuffer) IssuerQueueBlockCount(issuerID iotago.AccountID) int {
}

func (b *BasicBuffer) CreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
issuerQueue := NewIssuerQueue(issuerID)
b.activeIssuers.Set(issuerID, b.ringInsert(issuerQueue))
element := b.activeIssuers.Compute(issuerID, func(_ *ring.Ring, exists bool) *ring.Ring {
if exists {
panic(fmt.Sprintf("issuer queue already exists: %s", issuerID.String()))
}

return b.ringInsert(NewIssuerQueue(issuerID, func(totalSizeDelta int64, readySizeDelta int64) {
if totalSizeDelta != 0 {
b.totalBlocksCount.Add(totalSizeDelta)
}
if readySizeDelta != 0 {
b.readyBlocksCount.Add(readySizeDelta)
}
}))
})

issuerQueue, isIQ := element.Value.(*IssuerQueue)
if !isIQ {
panic("buffer contains elements that are not issuer queues")
}

return issuerQueue
}
Expand Down Expand Up @@ -127,7 +141,7 @@ func (b *BasicBuffer) RemoveIssuerQueue(issuerID iotago.AccountID) {
if !isIQ {
panic("buffer contains elements that are not issuer queues")
}
b.size.Sub(int64(issuerQueue.Size()))
issuerQueue.Clear()

b.ringRemove(element)
b.activeIssuers.Delete(issuerID)
Expand Down Expand Up @@ -158,10 +172,8 @@ func (b *BasicBuffer) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantu
return nil, false
}

b.size.Inc()

// if max buffer size exceeded, drop from tail of the longest mana-scaled queue
if b.Size() > maxBuffer {
if b.TotalBlocksCount() > maxBuffer {
return b.dropTail(quantumFunc, maxBuffer), true
}

Expand All @@ -178,40 +190,14 @@ func (b *BasicBuffer) Ready(block *blocks.Block) bool {
return issuerQueue.Ready(block)
}

// ReadyBlocksCount returns the number of ready blocks in the buffer.
func (b *BasicBuffer) ReadyBlocksCount() (readyBlocksCount int) {
start := b.Current()
if start == nil {
return
}

for q := start; ; {
readyBlocksCount += q.readyHeap.Len()
q = b.Next()
if q == start {
break
}
}

return
}

// TotalBlocksCount returns the number of blocks in the buffer.
func (b *BasicBuffer) TotalBlocksCount() (blocksCount int) {
start := b.Current()
if start == nil {
return
}
for q := start; ; {
blocksCount += q.readyHeap.Len()
blocksCount += q.nonReadyMap.Size()
q = b.Next()
if q == start {
break
}
}
return int(b.totalBlocksCount.Load())
}

return
// ReadyBlocksCount returns the number of ready blocks in the buffer.
func (b *BasicBuffer) ReadyBlocksCount() (readyBlocksCount int) {
return int(b.readyBlocksCount.Load())
}

// Next returns the next IssuerQueue in round-robin order.
Expand Down Expand Up @@ -250,8 +236,6 @@ func (b *BasicBuffer) PopFront() *blocks.Block {
return nil
}

b.size.Dec()

return block
}

Expand All @@ -275,7 +259,7 @@ func (b *BasicBuffer) IssuerIDs() []iotago.AccountID {

func (b *BasicBuffer) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) {
// remove as many blocks as necessary to stay within max buffer size
for b.Size() > maxBuffer {
for b.TotalBlocksCount() > maxBuffer {
// find the longest mana-scaled queue
maxIssuerID := b.mustLongestQueueIssuerID(quantumFunc)
longestQueue := b.IssuerQueue(maxIssuerID)
Expand All @@ -288,7 +272,6 @@ func (b *BasicBuffer) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBu
panic("buffer is full, but tail of longest queue does not exist")
}

b.size.Dec()
droppedBlocks = append(droppedBlocks, tail)
}

Expand Down
60 changes: 44 additions & 16 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/issuerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,37 @@ import (

// IssuerQueue keeps the submitted blocks of an issuer.
type IssuerQueue struct {
issuerID iotago.AccountID
issuerID iotago.AccountID
sizeChangedFunc func(totalSizeDelta int64, readySizeDelta int64, workDelta int64)

nonReadyMap *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block]
readyHeap generalheap.Heap[timed.HeapKey, *blocks.Block]
size atomic.Int64
work atomic.Int64

size atomic.Int64
work atomic.Int64
}

// NewIssuerQueue returns a new IssuerQueue.
func NewIssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
return &IssuerQueue{
func NewIssuerQueue(issuerID iotago.AccountID, sizeChangedCallback func(totalSizeDelta int64, readySizeDelta int64)) *IssuerQueue {
queue := &IssuerQueue{
issuerID: issuerID,
nonReadyMap: shrinkingmap.New[iotago.BlockID, *blocks.Block](),
}

queue.sizeChangedFunc = func(totalSizeDelta int64, readySizeDelta int64, workDelta int64) {
if totalSizeDelta != 0 {
queue.size.Add(totalSizeDelta)
}
if workDelta != 0 {
queue.work.Add(workDelta)
}

if sizeChangedCallback != nil {
sizeChangedCallback(totalSizeDelta, readySizeDelta)
}
}

return queue
}

// Size returns the total number of blocks in the queue.
Expand Down Expand Up @@ -70,21 +88,19 @@ func (q *IssuerQueue) Submit(element *blocks.Block) bool {
}

q.nonReadyMap.Set(element.ID(), element)
q.size.Inc()
q.work.Add(int64(element.WorkScore()))
q.sizeChangedFunc(1, 0, int64(element.WorkScore()))

return true
}

// Unsubmit removes a previously submitted block from the queue.
func (q *IssuerQueue) Unsubmit(block *blocks.Block) bool {
// unsubmit removes a previously submitted block from the queue.
func (q *IssuerQueue) unsubmit(block *blocks.Block) bool {
if _, submitted := q.nonReadyMap.Get(block.ID()); !submitted {
return false
}

q.nonReadyMap.Delete(block.ID())
q.size.Dec()
q.work.Sub(int64(block.WorkScore()))
q.sizeChangedFunc(-1, 0, -int64(block.WorkScore()))

return true
}
Expand All @@ -98,6 +114,8 @@ func (q *IssuerQueue) Ready(block *blocks.Block) bool {
q.nonReadyMap.Delete(block.ID())
heap.Push(&q.readyHeap, &generalheap.HeapElement[timed.HeapKey, *blocks.Block]{Value: block, Key: timed.HeapKey(block.IssuingTime())})

q.sizeChangedFunc(0, 1, 0)

return true
}

Expand All @@ -112,6 +130,18 @@ func (q *IssuerQueue) IDs() (ids []iotago.BlockID) {
return ids
}

// Clear removes all blocks from the queue.
func (q *IssuerQueue) Clear() {
readyBlocksCount := int64(q.readyHeap.Len())

q.nonReadyMap.Clear()
for q.readyHeap.Len() > 0 {
_ = q.readyHeap.Pop()
}

q.sizeChangedFunc(-int64(q.Size()), -readyBlocksCount, -int64(q.Work()))
}

// Front returns the first ready block in the queue.
func (q *IssuerQueue) Front() *blocks.Block {
if q == nil || q.readyHeap.Len() == 0 {
Expand All @@ -132,8 +162,7 @@ func (q *IssuerQueue) PopFront() *blocks.Block {
panic("unable to pop from a non-empty heap.")
}
blk := heapElement.Value
q.size.Dec()
q.work.Sub(int64(blk.WorkScore()))
q.sizeChangedFunc(-1, -1, -int64(blk.WorkScore()))

return blk
}
Expand All @@ -152,7 +181,7 @@ func (q *IssuerQueue) RemoveTail() *blocks.Block {
heapTailIndex := q.heapTail()
// if heap tail (oldest ready block) does not exist or is newer than oldest non-ready block, unsubmit the oldest non-ready block
if oldestNonReadyBlock != nil && (heapTailIndex < 0 || q.readyHeap[heapTailIndex].Key.CompareTo(timed.HeapKey(oldestNonReadyBlock.IssuingTime())) > 0) {
if q.Unsubmit(oldestNonReadyBlock) {
if q.unsubmit(oldestNonReadyBlock) {
return oldestNonReadyBlock
}
} else if heapTailIndex < 0 { // the heap is empty
Expand All @@ -166,8 +195,7 @@ func (q *IssuerQueue) RemoveTail() *blocks.Block {
panic("trying to remove a heap element that does not exist.")
}
blk := heapElement.Value
q.size.Dec()
q.work.Sub(int64(blk.WorkScore()))
q.sizeChangedFunc(-1, -1, -int64(blk.WorkScore()))

return blk
}
Expand Down
Loading

0 comments on commit 842f535

Please sign in to comment.