Skip to content

Commit

Permalink
Merge pull request #737 from iotaledger/fix/memleak-protocol
Browse files Browse the repository at this point in the history
Fix chain managment logic
  • Loading branch information
piotrm50 authored Feb 28, 2024
2 parents 24cf994 + 076f3f7 commit adf7bd0
Show file tree
Hide file tree
Showing 13 changed files with 822 additions and 207 deletions.
67 changes: 59 additions & 8 deletions pkg/protocol/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type Chain struct {
// IsEvicted contains a flag that indicates whether this chain was evicted.
IsEvicted reactive.Event

// shouldEvict contains a flag that indicates whether this chain should be evicted.
shouldEvict reactive.Event

// chains contains a reference to the Chains instance that this chain belongs to.
chains *Chains

Expand All @@ -83,6 +86,7 @@ func newChain(chains *Chains) *Chain {
StartEngine: reactive.NewVariable[bool](),
Engine: reactive.NewVariable[*engine.Engine](),
IsEvicted: reactive.NewEvent(),
shouldEvict: reactive.NewEvent(),

chains: chains,
commitments: shrinkingmap.New[iotago.SlotIndex, *Commitment](),
Expand Down Expand Up @@ -126,6 +130,10 @@ func (c *Chain) LastCommonSlot() iotago.SlotIndex {
func (c *Chain) DispatchBlock(block *model.Block, src peer.ID) (dispatched bool) {
if c == nil {
return false
} else if c.IsEvicted.Get() {
c.LogTrace("discard for evicted chain", "commitmentID", block.ProtocolBlock().Header.SlotCommitmentID, "blockID", block.ID())

return true
}

dispatched = c.dispatchBlockToSpawnedEngine(block, src)
Expand All @@ -146,7 +154,7 @@ func (c *Chain) Commitment(slot iotago.SlotIndex) (commitment *Commitment, exist
case slot > forkingPoint.Slot():
return currentChain.commitments.Get(slot)
default:
currentChain = c.ParentChain.Get()
currentChain = currentChain.ParentChain.Get()
}
}

Expand Down Expand Up @@ -181,22 +189,57 @@ func (c *Chain) initLogger() (shutdown func()) {
c.StartEngine.LogUpdates(c, log.LevelDebug, "StartEngine"),
c.Engine.LogUpdates(c, log.LevelTrace, "Engine", (*engine.Engine).LogName),
c.IsEvicted.LogUpdates(c, log.LevelTrace, "IsEvicted"),
c.shouldEvict.LogUpdates(c, log.LevelTrace, "shouldEvict"),

c.Logger.UnsubscribeFromParentLogger,
)
}

// initDerivedProperties initializes the behavior of this chain by setting up the relations between its properties.
func (c *Chain) initDerivedProperties() (shutdown func()) {
return lo.Batch(
return lo.BatchReverse(
c.deriveWarpSyncMode(),

c.ForkingPoint.WithValue(c.deriveParentChain),
c.ParentChain.WithNonEmptyValue(lo.Bind(c, (*Chain).deriveChildChains)),
c.shouldEvict.OnTrigger(func() { go c.IsEvicted.Trigger() }),

lo.BatchReverse(
c.ForkingPoint.WithNonEmptyValue(func(forkingPoint *Commitment) (teardown func()) {
return lo.BatchReverse(
c.deriveParentChain(forkingPoint),

c.ParentChain.WithValue(func(parentChain *Chain) (teardown func()) {
return lo.BatchReverse(
parentChain.deriveChildChains(c),

c.deriveShouldEvict(forkingPoint, parentChain),
)
}),
)
}),
),

c.Engine.WithNonEmptyValue(c.deriveOutOfSyncThreshold),
)
}

// deriveShouldEvict defines how a chain determines whether it should be evicted (if it is not the main chain and either
// its forking point or its parent chain is evicted).
func (c *Chain) deriveShouldEvict(forkingPoint *Commitment, parentChain *Chain) (shutdown func()) {
if forkingPoint != nil && parentChain != nil {
return c.shouldEvict.DeriveValueFrom(reactive.NewDerivedVariable2(func(_, forkingPointIsEvicted bool, parentChainIsEvicted bool) bool {
return c.chains.Main.Get() != c && (forkingPointIsEvicted || parentChainIsEvicted)
}, forkingPoint.IsEvicted, parentChain.IsEvicted))
}

if forkingPoint != nil {
return c.shouldEvict.DeriveValueFrom(reactive.NewDerivedVariable(func(_, forkingPointIsEvicted bool) bool {
return c.chains.Main.Get() != c && forkingPointIsEvicted
}, forkingPoint.IsEvicted))
}

return
}

// deriveWarpSyncMode defines how a chain determines whether it is in warp sync mode or not.
func (c *Chain) deriveWarpSyncMode() func() {
return c.WarpSyncMode.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestSyncedSlot iotago.SlotIndex, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
Expand All @@ -211,12 +254,16 @@ func (c *Chain) deriveWarpSyncMode() func() {
}

// deriveChildChains defines how a chain determines its ChildChains (by adding each child to the set).
func (c *Chain) deriveChildChains(child *Chain) func() {
c.ChildChains.Add(child)
func (c *Chain) deriveChildChains(child *Chain) (teardown func()) {
if c != nil && c != child {
c.ChildChains.Add(child)

return func() {
c.ChildChains.Delete(child)
teardown = func() {
c.ChildChains.Delete(child)
}
}

return
}

// deriveParentChain defines how a chain determines its parent chain from its forking point (it inherits the Chain from
Expand Down Expand Up @@ -261,6 +308,10 @@ func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) {
newCommitment.IsAttested.OnTrigger(func() { c.LatestAttestedCommitment.Set(newCommitment) }),
newCommitment.IsVerified.OnTrigger(func() { c.LatestProducedCommitment.Set(newCommitment) }),
newCommitment.IsSynced.OnTrigger(func() { c.LatestSyncedSlot.Set(newCommitment.Slot()) }),

func() {
c.commitments.Delete(newCommitment.Slot())
},
)
}

Expand Down
32 changes: 13 additions & 19 deletions pkg/protocol/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,11 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) {
c.CumulativeWeight.Set(c.Commitment.CumulativeWeight())
}

return lo.Batch(
parent.deriveChildren(c),
parent.registerChild(c)

return lo.BatchReverse(
c.deriveChain(parent),

c.deriveCumulativeAttestedWeight(parent),
c.deriveIsAboveLatestVerifiedCommitment(parent),

Expand All @@ -277,25 +278,16 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) {
)
}

// deriveChildren derives the children of this Commitment by adding the given child to the Children set.
func (c *Commitment) deriveChildren(child *Commitment) (unregisterChild func()) {
// registerChild adds the given Commitment as a child of this Commitment and sets it as the main child if it is the
// first child of this Commitment.
func (c *Commitment) registerChild(child *Commitment) {
c.MainChild.Compute(func(mainChild *Commitment) *Commitment {
if !c.Children.Add(child) || mainChild != nil {
return mainChild
}

return child
})

return func() {
c.MainChild.Compute(func(mainChild *Commitment) *Commitment {
if !c.Children.Delete(child) || child != mainChild {
return mainChild
}

return lo.Return1(c.Children.Any())
})
}
}

// deriveChain derives the Chain of this Commitment which is either inherited from the parent if we are the main child
Expand All @@ -307,19 +299,21 @@ func (c *Commitment) deriveChain(parent *Commitment) func() {
return currentChain
}

// if we are not the main child of our parent, we spawn a new chain
// If we are not the main child of our parent, we spawn a new chain.
// Here we basically move commitments to a new chain if there's a fork.
if c != mainChild {
if currentChain == nil {
if currentChain == nil || currentChain == parentChain {
currentChain = c.commitments.protocol.Chains.newChain()
currentChain.ForkingPoint.Set(c)
}

return currentChain
}

// if we are the main child of our parent, and our chain is not the parent chain (that we are supposed to
// inherit), then we evict our current chain (we will spawn a new one if we ever change back to not being the
// main child)
// If we are the main child of our parent, and our chain is not the parent chain,
// then we inherit the parent chain and evict the current one.
// We will spawn a new one if we ever change back to not being the main child.
// Here we basically move commitments to the parent chain.
if currentChain != nil && currentChain != parentChain {
currentChain.IsEvicted.Trigger()
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/protocol/commitments.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,11 @@ func (c *Commitments) publishRootCommitment(mainChain *Chain, mainEngine *engine
publishedCommitment.Chain.Set(mainChain)
}

// TODO: USE SET HERE (debug eviction issues)
mainChain.ForkingPoint.DefaultTo(publishedCommitment)
// Update the forking point of a chain only if the root is empty or root belongs to the main chain or the published commitment is on the main chain.
// to avoid updating ForkingPoint of the new mainChain into the past.
if c.Root.Get() == nil || c.Root.Get().Chain.Get() == mainChain || publishedCommitment.Chain.Get() == mainChain {
mainChain.ForkingPoint.Set(publishedCommitment)
}

c.Root.Set(publishedCommitment)
})
Expand Down Expand Up @@ -227,7 +230,7 @@ func (c *Commitments) publishCommitment(commitment *model.Commitment) (published
func (c *Commitments) cachedRequest(commitmentID iotago.CommitmentID, requestIfMissing ...bool) *promise.Promise[*Commitment] {
// handle evicted slots
slotEvicted := c.protocol.EvictionEvent(commitmentID.Index())
if slotEvicted.WasTriggered() && c.protocol.LastEvictedSlot() != 0 {
if slotEvicted.WasTriggered() {
return promise.New[*Commitment]().Reject(ErrorSlotEvicted)
}

Expand Down Expand Up @@ -271,9 +274,11 @@ func (c *Commitments) initCommitment(commitment *Commitment, slotEvicted reactiv
commitment.LogDebug("created", "id", commitment.ID())

// solidify the parent of the commitment
c.cachedRequest(commitment.PreviousCommitmentID(), true).OnSuccess(func(parent *Commitment) {
commitment.Parent.Set(parent)
})
if root := c.Root.Get(); root != nil && commitment.Slot() > root.Slot() {
c.cachedRequest(commitment.PreviousCommitmentID(), true).OnSuccess(func(parent *Commitment) {
parent.IsEvicted.OnTrigger(commitment.Parent.ToggleValue(parent))
})
}

// add commitment to the set
c.Add(commitment)
Expand Down
2 changes: 1 addition & 1 deletion pkg/tests/confirmation_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestConfirmationFlags(t *testing.T) {
testsuite.WithProtocolParameters(ts.API.ProtocolParameters()),
testsuite.WithLatestCommitment(genesisCommitment),
testsuite.WithLatestFinalizedSlot(0),
testsuite.WithChainID(genesisCommitment.MustID()),
testsuite.WithMainChainID(genesisCommitment.MustID()),
testsuite.WithStorageCommitments([]*iotago.Commitment{genesisCommitment}),
testsuite.WithSybilProtectionCommittee(0, expectedCommittee),
testsuite.WithSybilProtectionOnlineCommittee(lo.Return1(lo.Return1(nodeA.Protocol.Engines.Main.Get().SybilProtection.SeatManager().CommitteeInSlot(1)).GetSeat(nodeA.Validator.AccountID))),
Expand Down
8 changes: 4 additions & 4 deletions pkg/tests/protocol_engine_rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestProtocol_EngineRollbackFinalization(t *testing.T) {
testsuite.WithProtocolParameters(ts.API.ProtocolParameters()),
testsuite.WithLatestCommitment(genesisCommitment),
testsuite.WithLatestFinalizedSlot(0),
testsuite.WithChainID(genesisCommitment.MustID()),
testsuite.WithMainChainID(genesisCommitment.MustID()),
testsuite.WithStorageCommitments([]*iotago.Commitment{genesisCommitment}),

testsuite.WithSybilProtectionCommittee(0, expectedCommittee),
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestProtocol_EngineRollbackNoFinalization(t *testing.T) {
testsuite.WithProtocolParameters(ts.API.ProtocolParameters()),
testsuite.WithLatestCommitment(genesisCommitment),
testsuite.WithLatestFinalizedSlot(0),
testsuite.WithChainID(genesisCommitment.MustID()),
testsuite.WithMainChainID(genesisCommitment.MustID()),
testsuite.WithStorageCommitments([]*iotago.Commitment{genesisCommitment}),

testsuite.WithSybilProtectionCommittee(0, expectedCommittee),
Expand Down Expand Up @@ -497,7 +497,7 @@ func TestProtocol_EngineRollbackNoFinalizationLastSlot(t *testing.T) {
testsuite.WithProtocolParameters(ts.API.ProtocolParameters()),
testsuite.WithLatestCommitment(genesisCommitment),
testsuite.WithLatestFinalizedSlot(0),
testsuite.WithChainID(genesisCommitment.MustID()),
testsuite.WithMainChainID(genesisCommitment.MustID()),
testsuite.WithStorageCommitments([]*iotago.Commitment{genesisCommitment}),

testsuite.WithSybilProtectionCommittee(0, expectedCommittee),
Expand Down Expand Up @@ -691,7 +691,7 @@ func TestProtocol_EngineRollbackNoFinalizationBeforePointOfNoReturn(t *testing.T
testsuite.WithProtocolParameters(ts.API.ProtocolParameters()),
testsuite.WithLatestCommitment(genesisCommitment),
testsuite.WithLatestFinalizedSlot(0),
testsuite.WithChainID(genesisCommitment.MustID()),
testsuite.WithMainChainID(genesisCommitment.MustID()),
testsuite.WithStorageCommitments([]*iotago.Commitment{genesisCommitment}),

testsuite.WithSybilProtectionCommittee(0, expectedCommittee),
Expand Down
Loading

0 comments on commit adf7bd0

Please sign in to comment.