Skip to content

Commit

Permalink
Fix deadlock and refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
nickeskov committed Nov 18, 2024
1 parent d13b940 commit 4dc94d5
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 26 deletions.
10 changes: 3 additions & 7 deletions cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func runNode(ctx context.Context, nc *config) (_ io.Closer, retErr error) {
return nil, errors.Wrap(err, "failed to create state parameters")
}

var bUpdatesExtension *state.BlockchainUpdatesExtension
var bUpdatesExtension *blockchaininfo.BlockchainUpdatesExtension
if nc.enableBlockchainUpdatesPlugin {
var bUErr error
bUpdatesExtension, bUErr = runBlockchainUpdatesPlugin(ctx, cfg)
Expand Down Expand Up @@ -808,7 +808,7 @@ func runAPIs(
func runBlockchainUpdatesPlugin(
ctx context.Context,
cfg *settings.BlockchainSettings,
) (*state.BlockchainUpdatesExtension, error) {
) (*blockchaininfo.BlockchainUpdatesExtension, error) {
const l2ContractAddr = "3Msx4Aq69zWUKy4d1wyKnQ4ofzEDAfv5Ngf"

l2address, cnvrtErr := proto.NewAddressFromString(l2ContractAddr)
Expand All @@ -829,11 +829,7 @@ func runBlockchainUpdatesPlugin(
}
}()

return &state.BlockchainUpdatesExtension{
EnableBlockchainUpdatesPlugin: true,
BUpdatesChannel: updatesChannel,
L2ContractAddress: l2address,
}, nil
return blockchaininfo.NewBlockchainUpdatesExtension(ctx, l2address, updatesChannel), nil
}

func FromArgs(scheme proto.Scheme, c *config) func(s *settings.NodeSettings) error {
Expand Down
55 changes: 55 additions & 0 deletions pkg/blockchaininfo/bupdates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package blockchaininfo

import (
"context"

"github.com/wavesplatform/gowaves/pkg/proto"
)

type BlockchainUpdatesExtension struct {
ctx context.Context
enableBlockchainUpdatesPlugin bool
l2ContractAddress proto.WavesAddress
bUpdatesChannel chan<- BUpdatesInfo
}

func NewBlockchainUpdatesExtension(
ctx context.Context,
l2ContractAddress proto.WavesAddress,
bUpdatesChannel chan<- BUpdatesInfo,
) *BlockchainUpdatesExtension {
return &BlockchainUpdatesExtension{
ctx: ctx,
enableBlockchainUpdatesPlugin: true,
l2ContractAddress: l2ContractAddress,
bUpdatesChannel: bUpdatesChannel,
}
}

func (e *BlockchainUpdatesExtension) EnableBlockchainUpdatesPlugin() bool {
return e != nil && e.enableBlockchainUpdatesPlugin
}

func (e *BlockchainUpdatesExtension) L2ContractAddress() proto.WavesAddress {
return e.l2ContractAddress
}

func (e *BlockchainUpdatesExtension) WriteBUpdates(bUpdates BUpdatesInfo) {
if e.bUpdatesChannel == nil {
return
}
select {
case e.bUpdatesChannel <- bUpdates:
case <-e.ctx.Done():
e.close()
return
}
}

func (e *BlockchainUpdatesExtension) close() {
if e.bUpdatesChannel == nil {
return
}
close(e.bUpdatesChannel)
e.bUpdatesChannel = nil
}
3 changes: 2 additions & 1 deletion pkg/state/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/pkg/errors"

"github.com/wavesplatform/gowaves/pkg/blockchaininfo"
"github.com/wavesplatform/gowaves/pkg/crypto"
"github.com/wavesplatform/gowaves/pkg/keyvalue"
"github.com/wavesplatform/gowaves/pkg/libs/ntptime"
Expand Down Expand Up @@ -232,7 +233,7 @@ func NewState(
params StateParams,
settings *settings.BlockchainSettings,
enableLightNode bool,
bUpdatesExtension *BlockchainUpdatesExtension,
bUpdatesExtension *blockchaininfo.BlockchainUpdatesExtension,
) (State, error) {
s, err := newStateManager(dataDir, amend, params, settings, enableLightNode, bUpdatesExtension)
if err != nil {
Expand Down
28 changes: 11 additions & 17 deletions pkg/state/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package state

import (
"fmt"

"github.com/mr-tron/base58/base58"
"github.com/pkg/errors"
"github.com/wavesplatform/gowaves/pkg/blockchaininfo"
"go.uber.org/zap"

"github.com/wavesplatform/gowaves/pkg/blockchaininfo"

"github.com/wavesplatform/gowaves/pkg/crypto"
"github.com/wavesplatform/gowaves/pkg/errs"
"github.com/wavesplatform/gowaves/pkg/proto"
Expand Down Expand Up @@ -55,13 +57,7 @@ type txAppender struct {
// appending transactions.
buildApiData bool

bUpdatesExtension *BlockchainUpdatesExtension
}

type BlockchainUpdatesExtension struct {
EnableBlockchainUpdatesPlugin bool
L2ContractAddress proto.WavesAddress
BUpdatesChannel chan<- blockchaininfo.BUpdatesInfo
bUpdatesExtension *blockchaininfo.BlockchainUpdatesExtension
}

func newTxAppender(
Expand All @@ -72,7 +68,7 @@ func newTxAppender(
stateDB *stateDB,
atx *addressTransactions,
snapshotApplier *blockSnapshotsApplier,
bUpdatesExtension *BlockchainUpdatesExtension,
bUpdatesExtension *blockchaininfo.BlockchainUpdatesExtension,
) (*txAppender, error) {
buildAPIData, err := stateDB.stateStoresApiData()
if err != nil {
Expand Down Expand Up @@ -847,12 +843,10 @@ func (a *txAppender) appendBlock(params *appendBlockParams) error {

// write updates into the updatesChannel here
// TODO possibly run it in a goroutine? make sure goroutines run in order?
if a.bUpdatesExtension != nil {
if a.bUpdatesExtension.EnableBlockchainUpdatesPlugin {
updtErr := a.updateBlockchainUpdateInfo(blockInfo, params.block)
if updtErr != nil {
return updtErr
}
if a.bUpdatesExtension != nil && a.bUpdatesExtension.EnableBlockchainUpdatesPlugin() {
updtErr := a.updateBlockchainUpdateInfo(blockInfo, params.block)
if updtErr != nil {
return updtErr
}
}

Expand All @@ -879,7 +873,7 @@ func (a *txAppender) appendBlock(params *appendBlockParams) error {

func (a *txAppender) updateBlockchainUpdateInfo(blockInfo *proto.BlockInfo, blockHeader *proto.BlockHeader) error {
// TODO improve this by using diffs instead grabbing all the records every time
dataEntries, err := a.ia.state.RetrieveEntries(proto.NewRecipientFromAddress(a.bUpdatesExtension.L2ContractAddress))
dataEntries, err := a.ia.state.RetrieveEntries(proto.NewRecipientFromAddress(a.bUpdatesExtension.L2ContractAddress()))
if err != nil && !errors.Is(err, proto.ErrNotFound) {
return err
}
Expand All @@ -896,7 +890,7 @@ func (a *txAppender) updateBlockchainUpdateInfo(blockInfo *proto.BlockInfo, bloc
Height: blockInfo.Height,
},
}
a.bUpdatesExtension.BUpdatesChannel <- bUpdatesInfo
a.bUpdatesExtension.WriteBUpdates(bUpdatesInfo)
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/wavesplatform/gowaves/pkg/blockchaininfo"
"github.com/wavesplatform/gowaves/pkg/consensus"
"github.com/wavesplatform/gowaves/pkg/crypto"
"github.com/wavesplatform/gowaves/pkg/errs"
Expand Down Expand Up @@ -519,7 +520,7 @@ func newStateManager(
params StateParams,
settings *settings.BlockchainSettings,
enableLightNode bool,
bUpdatesExtension *BlockchainUpdatesExtension,
bUpdatesExtension *blockchaininfo.BlockchainUpdatesExtension,
) (_ *stateManager, retErr error) {
if err := validateSettings(settings); err != nil {
return nil, err
Expand Down

0 comments on commit 4dc94d5

Please sign in to comment.