From 829343c100856740369b50686aae426ea021506e Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 25 Oct 2024 03:18:02 +0000 Subject: [PATCH 1/9] Bump actions/setup-go from 5.0.2 to 5.1.0 (#1521) --- .github/workflows/codeql-analysis.yml | 2 +- .github/workflows/go.yml | 6 +++--- .github/workflows/itests.yml | 2 +- .github/workflows/itests_race.yml | 2 +- .github/workflows/security.yml | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 4558bcf7e..8dd7177c5 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -42,7 +42,7 @@ jobs: uses: actions/checkout@v4 - name: Set up Go 1.22 - uses: actions/setup-go@v5.0.2 + uses: actions/setup-go@v5.1.0 with: go-version: 1.22.x check-latest: true diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 9cee80a54..cc64bba9e 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -16,7 +16,7 @@ jobs: with: fetch-depth: 0 - name: Set up Go 1.22 - uses: actions/setup-go@v5.0.2 + uses: actions/setup-go@v5.1.0 with: go-version: 1.22.x check-latest: true @@ -39,7 +39,7 @@ jobs: steps: - name: Set up Go 1.22 - uses: actions/setup-go@v5.0.2 + uses: actions/setup-go@v5.1.0 with: go-version: 1.22.x check-latest: true @@ -75,7 +75,7 @@ jobs: steps: - name: Set up Go 1.22 - uses: actions/setup-go@v5.0.2 + uses: actions/setup-go@v5.1.0 with: go-version: 1.22.x check-latest: true diff --git a/.github/workflows/itests.yml b/.github/workflows/itests.yml index 35018311f..6d0acc477 100644 --- a/.github/workflows/itests.yml +++ b/.github/workflows/itests.yml @@ -15,7 +15,7 @@ jobs: steps: - name: Set up Go 1.22 - uses: actions/setup-go@v5.0.2 + uses: actions/setup-go@v5.1.0 with: go-version: 1.22.x check-latest: true diff --git a/.github/workflows/itests_race.yml b/.github/workflows/itests_race.yml index 4c040cd9c..42c3402c9 100644 --- a/.github/workflows/itests_race.yml +++ b/.github/workflows/itests_race.yml @@ -12,7 +12,7 @@ jobs: steps: - name: Set up Go 1.22 - uses: actions/setup-go@v5.0.2 + uses: actions/setup-go@v5.1.0 with: go-version: 1.22.x check-latest: true diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index 600019660..c284b76da 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -65,7 +65,7 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v5.0.2 + uses: actions/setup-go@v5.1.0 with: go-version: 1.22.x check-latest: true From a7c6a468b9000dd31517fcc28bc1be065fc0c959 Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Mon, 28 Oct 2024 11:44:08 +0300 Subject: [PATCH 2/9] Close state resources in case of initializatino failure (#1523) * Update 'dist-node' Makefile target. * Close internal state db in case of initialization failure. * Close internal resources of 'blockReadWriter' in case of initialization failure. * Simplified history storage constructor. * Close internal resources of 'addressTransactions' initialization failure. * Close internal resources of 'stateManager' in case of initialization error. --- Makefile | 2 +- pkg/state/address_transactions.go | 14 ++- pkg/state/blockreadwriter.go | 110 +++++++++++++++--------- pkg/state/common_test.go | 3 +- pkg/state/history_formatter.go | 4 +- pkg/state/history_formatter_test.go | 4 +- pkg/state/history_storage.go | 14 +-- pkg/state/local_history_storage.go | 4 +- pkg/state/local_history_storage_test.go | 5 +- pkg/state/state.go | 63 +++++++++++--- 10 files changed, 143 insertions(+), 80 deletions(-) diff --git a/Makefile b/Makefile index d51ca49ba..8c000ad4c 100644 --- a/Makefile +++ b/Makefile @@ -123,7 +123,7 @@ build-node-windows-amd64: release-node: ver build-node-linux-amd64 build-node-linux-i386 build-node-linux-arm64 build-node-linux-arm build-node-darwin-amd64 build-node-windows-amd64 -dist-node: release-node build-node-mainnet-amd64-deb-package build-node-testnet-amd64-deb-package build-node-testnet-arm64-deb-package build-node-stagenet-amd64-deb-package build-node-stagenet-arm64-deb-package +dist-node: release-node build-node-mainnet-amd64-deb-package build-node-mainnet-arm64-deb-package build-node-testnet-amd64-deb-package build-node-testnet-arm64-deb-package build-node-stagenet-amd64-deb-package build-node-stagenet-arm64-deb-package @mkdir -p build/dist @cd ./build/; zip -j ./dist/node_$(VERSION)_Windows-amd64.zip ./bin/windows-amd64/node* @cd ./build/bin/linux-amd64/; tar pzcvf ../../dist/node_$(VERSION)_Linux-amd64.tar.gz ./node* diff --git a/pkg/state/address_transactions.go b/pkg/state/address_transactions.go index f85931de9..65f273eec 100644 --- a/pkg/state/address_transactions.go +++ b/pkg/state/address_transactions.go @@ -3,6 +3,7 @@ package state import ( "bufio" "encoding/binary" + stderrs "errors" "io" "os" "path/filepath" @@ -154,7 +155,7 @@ func newAddressTransactions( rw *blockReadWriter, params *addressTransactionsParams, amend bool, -) (*addressTransactions, error) { +) (_ *addressTransactions, retErr error) { bsParams := &batchedStorParams{ maxBatchSize: maxTransactionIdsBatchSize, recordSize: txMetaSize, @@ -165,6 +166,13 @@ func newAddressTransactions( if err != nil { return nil, err } + defer func() { + if retErr != nil { + if fErr := addrTransactionsFile.Close(); fErr != nil { + retErr = stderrs.Join(retErr, errors.Wrap(fErr, "failed to close address_transactions file")) + } + } + }() if err := manageFile(addrTransactionsFile, db); err != nil { return nil, err } @@ -183,8 +191,8 @@ func newAddressTransactions( amend: amend, } if params.providesData { - if err := atx.persist(); err != nil { - return nil, errors.Wrap(err, "failed to persist") + if pErr := atx.persist(); pErr != nil { // no need to close atx here because all resources will be closed above + return nil, errors.Wrap(pErr, "failed to persist") } } return atx, nil diff --git a/pkg/state/blockreadwriter.go b/pkg/state/blockreadwriter.go index 030a3a926..aa34fbfd1 100644 --- a/pkg/state/blockreadwriter.go +++ b/pkg/state/blockreadwriter.go @@ -3,6 +3,8 @@ package state import ( "bufio" "encoding/binary" + stderrs "errors" + "math" "os" "path/filepath" "sync" @@ -169,13 +171,16 @@ type blockReadWriter struct { addingBlock bool // Protobuf-related stuff. - protobufActivated bool - protobufTxStart, protobufHeadersStart uint64 - protobufAfterHeight uint64 + protobufInfoWithActivation mtx sync.RWMutex } +type protobufInfoWithActivation struct { + protobufActivated bool + protobufInfo +} + // openOrCreateForAppending function opens file if it exists or creates new in other case. func openOrCreateForAppending(path string) (*os.File, uint64, error) { file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0600) // #nosec: in this case check for prevent G304 (CWE-22) is not necessary @@ -199,19 +204,43 @@ func newBlockReadWriter( headerOffsetLen int, stateDB *stateDB, scheme proto.Scheme, -) (*blockReadWriter, error) { +) (_ *blockReadWriter, retErr error) { + if offsetLen < 0 { + return nil, errors.New("negative offset length") + } blockchain, blockchainSize, err := openOrCreateForAppending(filepath.Join(dir, "blockchain")) if err != nil { return nil, err } + defer func() { + if retErr != nil { + if fErr := blockchain.Close(); fErr != nil { + retErr = stderrs.Join(retErr, errors.Wrap(fErr, "failed to close blockchain file")) + } + } + }() headers, headersSize, err := openOrCreateForAppending(filepath.Join(dir, "headers")) if err != nil { return nil, err } + defer func() { + if retErr != nil { + if fErr := headers.Close(); fErr != nil { + retErr = stderrs.Join(retErr, errors.Wrap(fErr, "failed to close headers file")) + } + } + }() blockHeight2ID, _, err := openOrCreateForAppending(filepath.Join(dir, "block_height_to_id")) if err != nil { return nil, err } + defer func() { + if retErr != nil { + if fErr := blockHeight2ID.Close(); fErr != nil { + retErr = stderrs.Join(retErr, errors.Wrap(fErr, "failed to close block_height_to_id file")) + } + } + }() if offsetLen != 8 { // TODO: support different offset lengths. return nil, errors.New("only offsetLen 8 is currently supported") @@ -224,33 +253,35 @@ func newBlockReadWriter( if err != nil { return nil, errors.Errorf("failed to retrieve height: %v", err) } + pbInfo, err := loadProtobufInfo(stateDB.db) + if err != nil { + return nil, errors.Wrap(err, "failed to load protobuf info") + } rw := &blockReadWriter{ - db: stateDB.db, - dbBatch: stateDB.dbBatch, - stateDB: stateDB, - scheme: scheme, - blockchain: blockchain, - headers: headers, - blockHeight2ID: blockHeight2ID, - blockchainBuf: bufio.NewWriter(blockchain), - headersBuf: bufio.NewWriter(headers), - blockHeight2IDBuf: bufio.NewWriter(blockHeight2ID), - rtx: newRecentTransactions(), - rheaders: make(map[proto.BlockID]proto.BlockHeader), - blockInfo: make(map[proto.BlockID]blockMeta), - height2IDCache: make(map[uint64]proto.BlockID), - offsetEnd: uint64(1< Date: Mon, 28 Oct 2024 12:28:01 +0300 Subject: [PATCH 3/9] Add concurrency settings for CI workflows. (#1522) * Add concurrency settings for CI workflows. * Update concurrency gropus names. * Update 'cancel-in-progress' concurrency condition. --------- Co-authored-by: Alexey Kiselev --- .github/workflows/codeql-analysis.yml | 4 ++++ .github/workflows/go.yml | 4 ++++ .github/workflows/itests.yml | 4 ++++ .github/workflows/security.yml | 4 ++++ 4 files changed, 16 insertions(+) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 8dd7177c5..d549e5238 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -21,6 +21,10 @@ on: schedule: - cron: '15 12 * * 3' +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.ref != 'refs/heads/master' && github.event_name != 'workflow_dispatch' }} + jobs: analyze: name: Analyze diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index cc64bba9e..875bf406f 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -1,6 +1,10 @@ name: build on: [ push, pull_request, workflow_dispatch ] +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.ref != 'refs/heads/master' && github.event_name != 'workflow_dispatch' }} + jobs: golangci: name: lint diff --git a/.github/workflows/itests.yml b/.github/workflows/itests.yml index 6d0acc477..efbfd90f0 100644 --- a/.github/workflows/itests.yml +++ b/.github/workflows/itests.yml @@ -5,6 +5,10 @@ on: types: [ submitted ] branches: [ master ] +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.ref != 'refs/heads/master' && github.event_name != 'workflow_dispatch' }} + jobs: itest: name: integration_tests diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index c284b76da..62ceabe87 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -8,6 +8,10 @@ on: schedule: - cron: '30 12 * * 3' +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.ref != 'refs/heads/master' && github.event_name != 'workflow_dispatch' }} + jobs: gosec: name: gosec check From d1e4e1095d5131342135c78223f968d76378a604 Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Tue, 29 Oct 2024 14:50:20 +0300 Subject: [PATCH 4/9] Update '.gitattributes'. (#1524) --- .gitattributes | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitattributes b/.gitattributes index 8c6d6b2cf..a5f49af6a 100644 --- a/.gitattributes +++ b/.gitattributes @@ -5,3 +5,4 @@ *.bat text eol=crlf *.cmd text eol=crlf importer.pgo -text +pkg/state/testdata/blocks-10000 -text From 50ce41db1274386a0f3f54fcebb519b2012a7326 Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Wed, 30 Oct 2024 21:20:10 +0300 Subject: [PATCH 5/9] Update '.gitattributes'. (#1526) Add auto-detection of text files. --- .gitattributes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitattributes b/.gitattributes index a5f49af6a..96ecba735 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,4 +1,4 @@ -* text eol=lf +* text=auto eol=lf # Windows .bat files are known to have multiple bugs when run with LF # endings, and so they are checked in with CRLF endings. From 1a229720c996500ec57fd2997b6559e90bbabca6 Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Thu, 31 Oct 2024 11:58:42 +0300 Subject: [PATCH 6/9] Remove 'pkg/libs/runner' package. (#1527) --- cmd/node/node.go | 5 --- pkg/api/node.go | 4 -- pkg/api/node_api.go | 8 ---- pkg/api/routes.go | 1 - pkg/libs/runner/asynchronous.go | 15 -------- pkg/libs/runner/asynchronous_test.go | 15 -------- pkg/libs/runner/interface.go | 6 --- pkg/libs/runner/loggable.go | 56 ---------------------------- pkg/libs/runner/loggable_test.go | 32 ---------------- pkg/libs/runner/synchronous.go | 15 -------- pkg/libs/runner/synchronous_test.go | 17 --------- pkg/node/node.go | 20 ++++------ pkg/services/services.go | 2 - 13 files changed, 8 insertions(+), 188 deletions(-) delete mode 100644 pkg/libs/runner/asynchronous.go delete mode 100644 pkg/libs/runner/asynchronous_test.go delete mode 100644 pkg/libs/runner/interface.go delete mode 100644 pkg/libs/runner/loggable.go delete mode 100644 pkg/libs/runner/loggable_test.go delete mode 100644 pkg/libs/runner/synchronous.go delete mode 100644 pkg/libs/runner/synchronous_test.go diff --git a/cmd/node/node.go b/cmd/node/node.go index 844a70962..86115ee2c 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -26,7 +26,6 @@ import ( "github.com/wavesplatform/gowaves/pkg/grpc/server" "github.com/wavesplatform/gowaves/pkg/libs/microblock_cache" "github.com/wavesplatform/gowaves/pkg/libs/ntptime" - "github.com/wavesplatform/gowaves/pkg/libs/runner" "github.com/wavesplatform/gowaves/pkg/logging" "github.com/wavesplatform/gowaves/pkg/metrics" "github.com/wavesplatform/gowaves/pkg/miner" @@ -409,9 +408,6 @@ func main() { return } - async := runner.NewAsync() - logRunner := runner.NewLogRunner(async) - declAddr := proto.NewTCPAddrFromString(conf.DeclaredAddr) bindAddr := proto.NewTCPAddrFromString(nc.bindAddress) @@ -481,7 +477,6 @@ func main() { BlocksApplier: blockApplier, UtxPool: utx, Scheme: cfg.AddressSchemeCharacter, - LoggableRunner: logRunner, Time: ntpTime, Wallet: wal, MicroBlockCache: microblock_cache.NewMicroBlockCache(), diff --git a/pkg/api/node.go b/pkg/api/node.go index f5e65176f..e4c50d7a0 100644 --- a/pkg/api/node.go +++ b/pkg/api/node.go @@ -13,7 +13,3 @@ type nodeVersion struct { func (a *App) version() nodeVersion { return nodeVersion{Version: fmt.Sprintf("Gowaves %s", versioning.Version)} } - -func (a *App) NodeProcesses() map[string]int { - return a.services.LoggableRunner.Running() -} diff --git a/pkg/api/node_api.go b/pkg/api/node_api.go index 167e41e9f..1759448c6 100644 --- a/pkg/api/node_api.go +++ b/pkg/api/node_api.go @@ -780,14 +780,6 @@ func (a *NodeApi) Addresses(w http.ResponseWriter, _ *http.Request) error { return nil } -func (a *NodeApi) nodeProcesses(w http.ResponseWriter, _ *http.Request) error { - rs := a.app.NodeProcesses() - if err := trySendJson(w, rs); err != nil { - return errors.Wrap(err, "nodeProcesses") - } - return nil -} - func (a *NodeApi) stateHashDebug(height proto.Height) (*proto.StateHashDebug, error) { stateHash, err := a.state.LegacyStateHashAtHeight(height) if err != nil { diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 868ebf8e4..4e4f8a153 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -95,7 +95,6 @@ func (a *NodeApi) routes(opts *RunOptions) (chi.Router, error) { }) r.Get("/miner/info", wrapper(a.GoMinerInfo)) - r.Get("/node/processes", wrapper(a.nodeProcesses)) r.Get("/pool/transactions", wrapper(a.poolTransactions)) }) diff --git a/pkg/libs/runner/asynchronous.go b/pkg/libs/runner/asynchronous.go deleted file mode 100644 index e6e78e2f7..000000000 --- a/pkg/libs/runner/asynchronous.go +++ /dev/null @@ -1,15 +0,0 @@ -package runner - -// asynchronous for async funcs -type asynchronous struct { -} - -// Go run func asynchronous -func (a asynchronous) Go(f func()) { - go f() -} - -// NewAsync create new asynchronous -func NewAsync() asynchronous { - return asynchronous{} -} diff --git a/pkg/libs/runner/asynchronous_test.go b/pkg/libs/runner/asynchronous_test.go deleted file mode 100644 index d3b9ad87b..000000000 --- a/pkg/libs/runner/asynchronous_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package runner - -import ( - "testing" -) - -// if test incorrect, it will hang forever -func TestAsynchronous_Go(t *testing.T) { - s := NewAsync() - ch := make(chan int) - s.Go(func() { - ch <- 1 - }) - <-ch -} diff --git a/pkg/libs/runner/interface.go b/pkg/libs/runner/interface.go deleted file mode 100644 index ec3e99e23..000000000 --- a/pkg/libs/runner/interface.go +++ /dev/null @@ -1,6 +0,0 @@ -package runner - -// Runner run asynchronous or synchronous -type Runner interface { - Go(func()) -} diff --git a/pkg/libs/runner/loggable.go b/pkg/libs/runner/loggable.go deleted file mode 100644 index a0a7a993c..000000000 --- a/pkg/libs/runner/loggable.go +++ /dev/null @@ -1,56 +0,0 @@ -package runner - -import ( - "maps" - "sync" -) - -type LogRunner interface { - Named(name string, f func()) (done <-chan struct{}) - Running() map[string]int -} - -type log struct { - mu sync.Mutex - r Runner - running map[string]int -} - -func NewLogRunner(r Runner) *log { - return &log{ - r: r, - running: make(map[string]int), - } -} - -func (a *log) addNamed(name string) { - a.mu.Lock() - defer a.mu.Unlock() - a.running[name] += 1 -} - -func (a *log) removeNamed(name string) { - a.mu.Lock() - defer a.mu.Unlock() - a.running[name] -= 1 - if a.running[name] == 0 { - delete(a.running, name) - } -} - -func (a *log) Named(name string, f func()) <-chan struct{} { - done := make(chan struct{}) - a.r.Go(func() { - defer close(done) - a.addNamed(name) - defer a.removeNamed(name) - f() - }) - return done -} - -func (a *log) Running() map[string]int { - a.mu.Lock() - defer a.mu.Unlock() - return maps.Clone(a.running) -} diff --git a/pkg/libs/runner/loggable_test.go b/pkg/libs/runner/loggable_test.go deleted file mode 100644 index 0271d1abe..000000000 --- a/pkg/libs/runner/loggable_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package runner - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestNewLoggableRunner(t *testing.T) { - require.NotNil(t, NewLogRunner(nil)) -} - -func TestLog_Named(t *testing.T) { - a := NewLogRunner(NewAsync()) - require.Len(t, a.Running(), 0) - - started := make(chan int, 1) - ended := make(chan int, 1) - - require.Len(t, a.Running(), 0) - - done := a.Named("some", func() { - started <- 1 - <-ended - }) - - <-started - require.Len(t, a.Running(), 1) - ended <- 1 - <-done - require.Len(t, a.Running(), 0) -} diff --git a/pkg/libs/runner/synchronous.go b/pkg/libs/runner/synchronous.go deleted file mode 100644 index 9877ba5d8..000000000 --- a/pkg/libs/runner/synchronous.go +++ /dev/null @@ -1,15 +0,0 @@ -package runner - -// synchronous for sync funcs -type synchronous struct { -} - -// // Go run func synchronous -func (a synchronous) Go(f func()) { - f() -} - -// NewAsync create new synchronous -func NewSync() synchronous { - return synchronous{} -} diff --git a/pkg/libs/runner/synchronous_test.go b/pkg/libs/runner/synchronous_test.go deleted file mode 100644 index d131c524e..000000000 --- a/pkg/libs/runner/synchronous_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package runner - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -// incorrect test will be catched by race detector -func TestSynchronous_Go(t *testing.T) { - i := 0 - s := NewSync() - s.Go(func() { - i++ - }) - require.Equal(t, 1, i) -} diff --git a/pkg/node/node.go b/pkg/node/node.go index f8977789a..dfc73b6dd 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -2,7 +2,6 @@ package node import ( "context" - "fmt" "net" "reflect" "time" @@ -13,7 +12,6 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" - "github.com/wavesplatform/gowaves/pkg/libs/runner" "github.com/wavesplatform/gowaves/pkg/node/fsm" "github.com/wavesplatform/gowaves/pkg/node/fsm/tasks" "github.com/wavesplatform/gowaves/pkg/node/messages" @@ -156,7 +154,7 @@ func (a *Node) Run( zap.S().Errorf("Failed to create FSM: %v", err) return } - spawnAsync(ctx, tasksCh, a.services.LoggableRunner, async) + spawnAsync(ctx, tasksCh, async) actions := createActions() for { @@ -207,7 +205,7 @@ func (a *Node) Run( if err != nil { a.logErrors(err) } - spawnAsync(ctx, tasksCh, a.services.LoggableRunner, async) + spawnAsync(ctx, tasksCh, async) } } @@ -247,15 +245,13 @@ func (a *Node) runOutgoingConnections(ctx context.Context) { } } -func spawnAsync(ctx context.Context, ch chan tasks.AsyncTask, r runner.LogRunner, a fsm.Async) { +func spawnAsync(ctx context.Context, ch chan tasks.AsyncTask, a fsm.Async) { for _, t := range a { - func(t tasks.Task) { - _ = r.Named(fmt.Sprintf("Async Task %T", t), func() { - err := t.Run(ctx, ch) - if err != nil && !errors.Is(err, context.Canceled) { - zap.S().Warnf("Async task '%T' finished with error: %q", t, err) - } - }) + go func(t tasks.Task) { + err := t.Run(ctx, ch) + if err != nil && !errors.Is(err, context.Canceled) { + zap.S().Warnf("Async task '%T' finished with error: %q", t, err) + } }(t) } } diff --git a/pkg/services/services.go b/pkg/services/services.go index 635c5db96..b582fd255 100644 --- a/pkg/services/services.go +++ b/pkg/services/services.go @@ -1,7 +1,6 @@ package services import ( - "github.com/wavesplatform/gowaves/pkg/libs/runner" "github.com/wavesplatform/gowaves/pkg/node/messages" "github.com/wavesplatform/gowaves/pkg/node/peers" "github.com/wavesplatform/gowaves/pkg/proto" @@ -52,7 +51,6 @@ type Services struct { UtxPool types.UtxPool Scheme proto.Scheme InvRequester types.InvRequester - LoggableRunner runner.LogRunner Time types.Time Wallet types.EmbeddedWallet MicroBlockCache MicroBlockCache From c8d88c98a72e9589da688f5762e938e53bf40da7 Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Thu, 31 Oct 2024 14:18:50 +0300 Subject: [PATCH 7/9] Node return non zero error code if error occurred (#1528) * WIP * Fixed shutdown process of node HTTP API. * Change singature of 'PeerManager.Close' method. * Change singature of 'Node.Close' method. * Close all resourses in 'newHaltState' even in case of error. * Refactor 'node' main function. Now it returns non-zero code and closes all resourses in case of error. * Reorder functions. * Fix lint. * Dismiss 'gosec' G115 alert. * Fix typo. --- cmd/node/node.go | 673 ++++++++++++++++++++++----------- pkg/api/node_api.go | 29 +- pkg/miner/features.go | 8 +- pkg/miner/features_test.go | 10 - pkg/mock/peer_manager.go | 6 +- pkg/node/fsm/halt_state.go | 11 +- pkg/node/node.go | 3 +- pkg/node/peers/peer_manager.go | 5 +- 8 files changed, 485 insertions(+), 260 deletions(-) diff --git a/cmd/node/node.go b/cmd/node/node.go index 86115ee2c..7b3938bff 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -3,9 +3,10 @@ package main import ( "context" "crypto/rand" - "errors" + stderrs "errors" "flag" "fmt" + "io" "math" "math/big" "net/http" @@ -17,9 +18,11 @@ import ( "syscall" "time" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/sync/errgroup" "github.com/wavesplatform/gowaves/pkg/api" "github.com/wavesplatform/gowaves/pkg/crypto" @@ -50,11 +53,16 @@ import ( ) const ( - mb = 1 << 20 - defaultTimeout = 30 * time.Second - reserve = 10 + mb = 1 << 20 + defaultTimeout = 30 * time.Second + shutdownTimeout = 5 * time.Second + fileDescriptorsReserve = 10 ) +const profilerAddr = "localhost:6060" + +const utxPoolMaxSizeBytes = 1024 * mb + var defaultPeers = map[string]string{ "mainnet": "34.253.153.4:6868,168.119.116.189:6868,135.181.87.72:6868,162.55.39.115:6868,168.119.155.201:6868", "testnet": "159.69.126.149:6868,94.130.105.239:6868,159.69.126.153:6868,94.130.172.201:6868,35.157.247.122:6868", @@ -62,6 +70,8 @@ var defaultPeers = map[string]string{ } type config struct { + isParsed bool + logLevel zapcore.Level logDevelopment bool logNetwork bool @@ -90,7 +100,7 @@ type config struct { disableOutgoingConnections bool minerVoteFeatures string disableBloomFilter bool - reward string + reward int64 obsolescencePeriod time.Duration walletPath string walletPassword string @@ -102,13 +112,29 @@ type config struct { metricsID int metricsURL string dropPeers bool - dbFileDescriptors int + dbFileDescriptors uint newConnectionsLimit int disableNTP bool microblockInterval time.Duration enableLightMode bool } +var errConfigNotParsed = stderrs.New("config is not parsed") + +func (c *config) StatePath() (string, error) { + if !c.isParsed { + return "", errConfigNotParsed + } + if path := c.statePath; path != "" { + return path, nil + } + path, err := common.GetStatePath() + if err != nil { + return "", errors.Wrap(err, "failed to get common state path") + } + return path, nil +} + func (c *config) logParameters() { zap.S().Debugf("log-level: %s", c.logLevel) zap.S().Debugf("log-dev: %t", c.logDevelopment) @@ -146,6 +172,10 @@ func (c *config) logParameters() { } func (c *config) parse() { + if c.isParsed { // no need to parse twice + return + } + defer func() { c.isParsed = true }() const ( defaultBlacklistResidenceDuration = 5 * time.Minute defaultObsolescenceDuration = 4 * time.Hour @@ -205,7 +235,7 @@ func (c *config) parse() { flag.StringVar(&c.minerVoteFeatures, "vote", "", "Miner vote features.") flag.BoolVar(&c.disableBloomFilter, "disable-bloom", false, "Disable bloom filter. Less memory usage, but decrease performance.") - flag.StringVar(&c.reward, "reward", "", "Miner reward: for example 600000000.") + flag.Int64Var(&c.reward, "reward", 0, "Miner reward: for example 600000000.") flag.DurationVar(&c.obsolescencePeriod, "obsolescence", defaultObsolescenceDuration, "Blockchain obsolescence period. Disable mining if last block older then given value.") flag.StringVar(&c.walletPath, "wallet-path", "", "Path to wallet, or ~/.waves by default.") @@ -216,7 +246,7 @@ func (c *config) parse() { "Minimum connected peers for allow mining.") flag.BoolVar(&c.disableMiner, "disable-miner", false, "Disable miner.") flag.BoolVar(&c.profiler, "profiler", false, - "Start built-in profiler on 'http://localhost:6060/debug/pprof/'.") + fmt.Sprintf("Start built-in profiler on 'http://%s/debug/pprof/'.", profilerAddr)) flag.StringVar(&c.prometheus, "prometheus", "", "Provide collected metrics by prometheus client.") flag.IntVar(&c.metricsID, "metrics-id", -1, @@ -225,7 +255,7 @@ func (c *config) parse() { "URL of InfluxDB or Telegraf in form of 'http://username:password@host:port/db'.") flag.BoolVar(&c.dropPeers, "drop-peers", false, "Drop peers storage before node start.") - flag.IntVar(&c.dbFileDescriptors, "db-file-descriptors", state.DefaultOpenFilesCacheCapacity, + flag.UintVar(&c.dbFileDescriptors, "db-file-descriptors", uint(state.DefaultOpenFilesCacheCapacity), // #nosec:G115 "Maximum allowed file descriptors count that will be used by state database.") flag.IntVar(&c.newConnectionsLimit, "new-connections-limit", defaultNewConnectionLimit, "Number of new outbound connections established simultaneously, defaults to 10. Should be positive. "+ @@ -240,6 +270,20 @@ func (c *config) parse() { c.logLevel = *l } +func loggerSetup(nc *config) func() { + logger := logging.SetupLogger(nc.logLevel, + logging.DevelopmentFlag(nc.logDevelopment), + logging.NetworkFilter(nc.logNetwork), + logging.NetworkDataFilter(nc.logNetworkData), + logging.FSMFilter(nc.logFSM), + ) + return func() { + if err := logger.Sync(); err != nil && stderrs.Is(err, os.ErrInvalid) { + panic(fmt.Sprintf("Failed to close logging subsystem: %v\n", err)) + } + } +} + type Scheduler interface { Mine() chan scheduler.Emit types.Scheduler @@ -247,62 +291,55 @@ type Scheduler interface { } func main() { + os.Exit(realMain()) // for more info see https://github.com/golang/go/issues/42078 +} + +func realMain() int { nc := new(config) nc.parse() - logger := logging.SetupLogger(nc.logLevel, - logging.DevelopmentFlag(nc.logDevelopment), - logging.NetworkFilter(nc.logNetwork), - logging.NetworkDataFilter(nc.logNetworkData), - logging.FSMFilter(nc.logFSM), - ) + syncFn := loggerSetup(nc) + defer syncFn() + err := run(nc) + if err != nil { + zap.S().Errorf("Failed to run: %v", err) + return 1 + } + return 0 +} + +func run(nc *config) (retErr error) { + errg, ctx := errgroup.WithContext(context.Background()) defer func() { - err := logger.Sync() - if err != nil && errors.Is(err, os.ErrInvalid) { - panic(fmt.Sprintf("Failed to close logging subsystem: %v\n", err)) + if wErr := errg.Wait(); !errors.Is(wErr, context.Canceled) { + retErr = stderrs.Join(retErr, wErr) } }() + ctx, cancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) + defer cancel() - zap.S().Infof("Gowaves Node version: %s", versioning.Version) - - maxFDs, err := fdlimit.MaxFDs() - if err != nil { - zap.S().Fatalf("Initialization failure: %v", err) - } - _, err = fdlimit.RaiseMaxFDs(maxFDs) - if err != nil { - zap.S().Fatalf("Initialization failure: %v", err) + if nc.profiler { + errg.Go(func() error { + <-runProfiler(ctx) + return nil + }) } - if m := int(maxFDs) - int(nc.limitAllConnections) - reserve; nc.dbFileDescriptors > m { - zap.S().Fatalf( - "Invalid 'db-file-descriptors' flag value (%d). Value shall be less or equal to %d.", - nc.dbFileDescriptors, m) + if nc.prometheus != "" { + errg.Go(func() error { + <-runPrometheusMetricsServer(ctx, nc.prometheus) + return nil + }) } - if nc.profiler { - zap.S().Infof("Starting built-in profiler on 'http://localhost:6060/debug/pprof/'") - go func() { - pprofMux := http.NewServeMux() - // taken from "net/http/pprof" init() - pprofMux.HandleFunc("/debug/pprof/", pprof.Index) - pprofMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - pprofMux.HandleFunc("/debug/pprof/profile", pprof.Profile) - pprofMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - pprofMux.HandleFunc("/debug/pprof/trace", pprof.Trace) - s := &http.Server{ - Addr: "localhost:6060", - Handler: pprofMux, - ReadHeaderTimeout: defaultTimeout, - ReadTimeout: defaultTimeout, - } - zap.S().Warn(s.ListenAndServe()) - }() - } - - ctx, done := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) - defer done() + zap.S().Infof("Gowaves Node version: %s", versioning.Version) + + nc.logParameters() // print all parsed parameters + + if err := raiseToMaxFDs(nc); err != nil { // raiseToMaxFDs raises the limit of file descriptors + return errors.Wrap(err, "failed to raise file descriptors limit") + } if nc.metricsURL != "" && nc.metricsID != -1 { - err = metrics.Start(ctx, nc.metricsID, nc.metricsURL) + err := metrics.Start(ctx, nc.metricsID, nc.metricsURL) if err != nil { zap.S().Warnf("Metrics reporting failed to start: %v", err) zap.S().Warn("Proceeding without reporting any metrics") @@ -311,135 +348,382 @@ func main() { } } - nc.logParameters() + nodeCloser, err := runNode(ctx, nc) + if err != nil { + return errors.Wrap(err, "failed to run node") + } - var cfg *settings.BlockchainSettings - if nc.cfgPath != "" { - var f *os.File - f, err = os.Open(nc.cfgPath) - if err != nil { - zap.S().Fatalf("Failed to open configuration file: %v", err) - } - defer func() { _ = f.Close() }() - cfg, err = settings.ReadBlockchainSettings(f) - if err != nil { - zap.S().Fatalf("Failed to read configuration file: %v", err) - } - } else { - cfg, err = settings.BlockchainSettingsByTypeName(nc.blockchainType) - if err != nil { - zap.S().Errorf("Failed to get blockchain settings: %v", err) - return - } + <-ctx.Done() + zap.S().Info("User termination in progress...") + defer func() { <-time.After(1 * time.Second) }() // give some time to close internal node processes + if clErr := nodeCloser.Close(); clErr != nil { + return errors.Wrap(clErr, "failed to close node") } + return nil +} - conf := &settings.NodeSettings{} - err = settings.ApplySettings(conf, FromArgs(cfg.AddressSchemeCharacter, nc), settings.FromJavaEnviron) +func runNode(ctx context.Context, nc *config) (_ io.Closer, retErr error) { + cfg, err := blockchainSettings(nc) if err != nil { - zap.S().Errorf("Failed to apply node settings: %v", err) - return + return nil, errors.Wrap(err, "failed to get blockchain settings") } - err = conf.Validate() + conf, err := nodeSettings(nc, cfg.AddressSchemeCharacter) if err != nil { - zap.S().Errorf("Failed to validate node settings: %v", err) - return + return nil, errors.Wrap(err, "failed to get node settings") } - var wal types.EmbeddedWallet = wallet.NewEmbeddedWallet(wallet.NewLoader(nc.walletPath), - wallet.NewWallet(), cfg.AddressSchemeCharacter) - if nc.walletPassword != "" { - if err = wal.Load([]byte(nc.walletPassword)); err != nil { - zap.S().Errorf("Failed to load wallet: %v", err) - return - } + wal, err := embeddedWallet(nc, cfg.AddressSchemeCharacter) + if err != nil { + return nil, errors.Wrap(err, "failed to get embedded wallet") } - path := nc.statePath - if path == "" { - path, err = common.GetStatePath() - if err != nil { - zap.S().Errorf("Failed to get state path: %v", err) - return - } + path, err := nc.StatePath() + if err != nil { + return nil, errors.Wrap(err, "failed to get state path") + } + + ntpTime, err := getNtp(ctx, nc.disableNTP) + if err != nil { + return nil, errors.Wrap(err, "failed to get NTP time") } - reward, err := miner.ParseReward(nc.reward) + params, err := stateParams(nc, ntpTime) if err != nil { - zap.S().Errorf("Failed to parse '-reward': %v", err) - return + return nil, errors.Wrap(err, "failed to create state parameters") } - ntpTime, err := getNtp(ctx, nc.disableNTP) + st, err := state.NewState(path, true, params, cfg, nc.enableLightMode) if err != nil { - zap.S().Errorf("Failed to get NTP time: %v", err) - return + return nil, errors.Wrap(err, "failed to initialize node's state") + } + defer func() { retErr = closeIfErrorf(st, retErr, "failed to close state") }() + + features, err := minerFeatures(st, nc.minerVoteFeatures) + if err != nil { + return nil, errors.Wrap(err, "failed to parse and validate miner features") + } + + // Check if we need to start serving extended API right now. + if eapiErr := node.MaybeEnableExtendedApi(st, ntpTime); eapiErr != nil { + return nil, errors.Wrap(eapiErr, "failed to enable extended API") + } + + parent := peer.NewParent(nc.enableLightMode) + declAddr := proto.NewTCPAddrFromString(conf.DeclaredAddr) + + peerManager, err := createPeerManager(nc, conf, parent, declAddr) + if err != nil { + return nil, errors.Wrap(err, "failed to create peer manager") + } + defer func() { retErr = closeIfErrorf(peerManager, retErr, "failed to close peer manager") }() + go peerManager.Run(ctx) + + minerScheduler, err := newMinerScheduler(nc, st, wal, cfg, ntpTime, peerManager) + if err != nil { + return nil, errors.Wrap(err, "failed to initialize miner scheduler") } + svs, err := createServices(nc, st, wal, cfg, ntpTime, peerManager, parent, minerScheduler) + if err != nil { + return nil, errors.Wrap(err, "failed to create services") + } + + app, err := api.NewApp(nc.apiKey, minerScheduler, svs) + if err != nil { + return nil, errors.Wrap(err, "failed to initialize application") + } + + if pErr := spawnPeersByAddresses(ctx, conf.Addresses, peerManager); pErr != nil { + return nil, errors.Wrap(pErr, "failed to spawn peers by addresses") + } + + if apiErr := runAPIs(ctx, nc, conf, app, svs); apiErr != nil { + return nil, errors.Wrap(apiErr, "failed to run APIs") + } + + return startNode(ctx, nc, svs, features, minerScheduler, parent, declAddr), nil +} + +func startNode( + ctx context.Context, + nc *config, + svs services.Services, + features miner.Features, + minerScheduler Scheduler, + parent peer.Parent, + declAddr proto.TCPAddr, +) *node.Node { + bindAddr := proto.NewTCPAddrFromString(nc.bindAddress) + + mine := miner.NewMicroblockMiner(svs, features, nc.reward) + go miner.Run(ctx, mine, minerScheduler, svs.InternalChannel) + + ntw, networkInfoCh := network.NewNetwork(svs, parent, nc.obsolescencePeriod) + go ntw.Run(ctx) + + n := node.NewNode(svs, declAddr, bindAddr, nc.microblockInterval, nc.enableLightMode) + go n.Run(ctx, parent, svs.InternalChannel, networkInfoCh, ntw.SyncPeer()) + + go minerScheduler.Reschedule() // Reschedule mining after node start + + return n +} + +func raiseToMaxFDs(nc *config) error { + maxFDs, err := fdlimit.MaxFDs() + if err != nil { + return errors.Wrap(err, "failed to get max FDs") + } + _, err = fdlimit.RaiseMaxFDs(maxFDs) + if err != nil { + return errors.Wrap(err, "failed to raise max FDs") + } + if m := maxFDs - uint64(nc.limitAllConnections) - fileDescriptorsReserve; uint64(nc.dbFileDescriptors) > m { + return errors.Errorf("invalid 'db-file-descriptors' flag value (%d), value shall be less or equal to %d", + nc.dbFileDescriptors, m, + ) + } + return nil +} + +func blockchainSettings(nc *config) (_ *settings.BlockchainSettings, retErr error) { + if nc.cfgPath == "" { + cfg, err := settings.BlockchainSettingsByTypeName(nc.blockchainType) + if err != nil { + return nil, errors.Wrap(err, "failed to get blockchain settings") + } + return cfg, nil + } + f, err := os.Open(nc.cfgPath) + if err != nil { + return nil, errors.Wrap(err, "failed to open configuration file") + } + defer func() { + if clErr := f.Close(); clErr != nil { + retErr = stderrs.Join(retErr, errors.Wrap(clErr, "failed to close configuration file")) + } + }() + cfg, err := settings.ReadBlockchainSettings(io.LimitReader(f, mb)) + if err != nil { + return nil, errors.Wrap(err, "failed to read configuration file") + } + return cfg, nil +} + +func stateParams(nc *config, ntpTime types.Time) (state.StateParams, error) { + dbFileDescriptors := nc.dbFileDescriptors + if dbFileDescriptors > math.MaxInt { + return state.StateParams{}, errors.Errorf("too big 'db-file-descriptors' flag value (%d)", + nc.dbFileDescriptors, + ) + } params := state.DefaultStateParams() - params.StorageParams.DbParams.OpenFilesCacheCapacity = nc.dbFileDescriptors + params.StorageParams.DbParams.OpenFilesCacheCapacity = int(dbFileDescriptors) params.StoreExtendedApiData = nc.buildExtendedAPI params.ProvideExtendedApi = nc.serveExtendedAPI params.BuildStateHashes = nc.buildStateHashes params.Time = ntpTime params.DbParams.BloomFilterParams.Disable = nc.disableBloomFilter + return params, nil +} - st, err := state.NewState(path, true, params, cfg, nc.enableLightMode) - if err != nil { - zap.S().Errorf("Failed to initialize node's state: %v", err) - return +func runProfiler(ctx context.Context) <-chan struct{} { + pprofMux := http.NewServeMux() + // taken from "net/http/pprof" init() + pprofMux.HandleFunc("/debug/pprof/", pprof.Index) + pprofMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + pprofMux.HandleFunc("/debug/pprof/profile", pprof.Profile) + pprofMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + pprofMux.HandleFunc("/debug/pprof/trace", pprof.Trace) + s := &http.Server{ + Addr: profilerAddr, + Handler: pprofMux, + ReadHeaderTimeout: defaultTimeout, + ReadTimeout: defaultTimeout, } + s.RegisterOnShutdown(func() { + zap.S().Info("Profiler is shutting down...") + }) + go func() { + zap.S().Infof("Starting built-in profiler on 'http://%s/debug/pprof/'", profilerAddr) + err := s.ListenAndServe() + if err != nil && !errors.Is(err, http.ErrServerClosed) { + zap.S().Errorf("Failed to start profiler: %v", err) + } + }() + done := make(chan struct{}) + go func() { + defer close(done) + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) + defer cancel() + if err := s.Shutdown(shutdownCtx); err != nil { + zap.S().Errorf("Failed to shutdown profiler: %v", err) + } + }() + return done +} - features, err := miner.ParseVoteFeatures(nc.minerVoteFeatures) +func runPrometheusMetricsServer(ctx context.Context, prometheusAddr string) <-chan struct{} { + h := http.NewServeMux() + h.Handle("/metrics", promhttp.Handler()) + s := &http.Server{ + Addr: prometheusAddr, + Handler: h, + ReadHeaderTimeout: defaultTimeout, + ReadTimeout: defaultTimeout, + } + s.RegisterOnShutdown(func() { + zap.S().Info("Prometheus metrics server is shutting down...") + }) + go func() { + zap.S().Infof("Starting prometheus metrics server on '%s'", prometheusAddr) + err := s.ListenAndServe() + if err != nil && !errors.Is(err, http.ErrServerClosed) { + zap.S().Errorf("Failed to start prometheus metrics server: %v", err) + } + }() + done := make(chan struct{}) + go func() { + defer close(done) + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) + defer cancel() + if err := s.Shutdown(shutdownCtx); err != nil { + zap.S().Errorf("Failed to shutdown prometheus: %v", err) + } + }() + return done +} + +func runGRPCServer(ctx context.Context, addr string, nc *config, svs services.Services) error { + srv, srvErr := server.NewServer(svs) + if srvErr != nil { + return errors.Wrap(srvErr, "failed to create gRPC server") + } + go func() { + if runErr := srv.Run(ctx, addr, grpcAPIRunOptsFromCLIFlags(nc)); runErr != nil { + zap.S().Errorf("grpcServer.Run(): %v", runErr) + } + }() + return nil +} + +func nodeSettings(nc *config, scheme proto.Scheme) (*settings.NodeSettings, error) { + conf := &settings.NodeSettings{} + err := settings.ApplySettings(conf, FromArgs(scheme, nc), settings.FromJavaEnviron) if err != nil { - zap.S().Errorf("Failed to parse '-vote': %v", err) - return + return nil, errors.Wrap(err, "failed to apply node settings") } - features, err = miner.ValidateFeatures(st, features) + err = conf.Validate() if err != nil { - zap.S().Errorf("Failed to validate features: %v", err) - return + return nil, errors.Wrap(err, "failed to validate node settings") } + return conf, nil +} - // Check if we need to start serving extended API right now. - if err := node.MaybeEnableExtendedApi(st, ntpTime); err != nil { - zap.S().Errorf("Failed to enable extended API: %v", err) - return +func embeddedWallet(nc *config, scheme proto.Scheme) (types.EmbeddedWallet, error) { + wal := wallet.NewEmbeddedWallet(wallet.NewLoader(nc.walletPath), wallet.NewWallet(), scheme) + if nc.walletPassword != "" { + if err := wal.Load([]byte(nc.walletPassword)); err != nil { + return nil, errors.Wrap(err, "failed to load wallet") + } } + return wal, nil +} - declAddr := proto.NewTCPAddrFromString(conf.DeclaredAddr) - bindAddr := proto.NewTCPAddrFromString(nc.bindAddress) +func spawnPeersByAddresses(ctx context.Context, addressesByComma string, pm *peers.PeerManagerImpl) error { + if addressesByComma == "" { // That means that we don't have any peers to connect to + return nil + } + addresses := strings.Split(addressesByComma, ",") + for _, addr := range addresses { + tcpAddr := proto.NewTCPAddrFromString(addr) + if tcpAddr.Empty() { + // That means that configuration parameter is invalid + return errors.Errorf("Failed to parse TCPAddr from string %q", tcpAddr.String()) + } + if pErr := pm.AddAddress(ctx, tcpAddr); pErr != nil { + // That means that we have problems with peers storage + return errors.Wrapf(pErr, "failed to add address %q into known peers storage", tcpAddr.String()) + } + } + return nil +} - utxValidator, err := utxpool.NewValidator(st, ntpTime, nc.obsolescencePeriod) +func newMinerScheduler( + nc *config, + st state.State, + wal types.EmbeddedWallet, + cfg *settings.BlockchainSettings, + ntpTime types.Time, + peerManager peers.PeerManager, +) (Scheduler, error) { + if nc.disableMiner { + return scheduler.DisabledScheduler{}, nil + } + consensus := scheduler.NewMinerConsensus(peerManager, nc.minPeersMining) + ms, err := scheduler.NewScheduler(st, wal, cfg, ntpTime, consensus, nc.obsolescencePeriod) if err != nil { - zap.S().Errorf("Failed to initialize UTX: %v", err) - return + return nil, errors.Wrap(err, "failed to initialize miner scheduler") } - utx := utxpool.New(uint64(1024*mb), utxValidator, cfg) - parent := peer.NewParent(nc.enableLightMode) + return ms, nil +} + +func minerFeatures(st state.State, minerVoteFeaturesByComma string) (miner.Features, error) { + features, err := miner.ParseVoteFeatures(minerVoteFeaturesByComma) + if err != nil { + return nil, errors.Wrap(err, "failed to parse '-vote'") + } + + features, err = miner.ValidateFeatures(st, features) + if err != nil { + return nil, errors.Wrap(err, "failed to validate features") + } + return features, nil +} +func closeIfErrorf(closer io.Closer, retErr error, format string, args ...interface{}) error { + if retErr != nil { + if clErr := closer.Close(); clErr != nil { + return stderrs.Join(retErr, errors.Wrapf(clErr, format, args...)) + } + } + return retErr +} + +func createPeerManager( + nc *config, + conf *settings.NodeSettings, + parent peer.Parent, + declAddr proto.TCPAddr, +) (*peers.PeerManagerImpl, error) { nodeNonce, err := rand.Int(rand.Reader, new(big.Int).SetUint64(math.MaxInt32)) if err != nil { - zap.S().Errorf("Failed to get node's nonce: %v", err) - return + return nil, errors.Wrap(err, "failed to get node's nonce") } - peerSpawnerImpl := peers.NewPeerSpawner(parent, conf.WavesNetwork, declAddr, nc.nodeName, - nodeNonce.Uint64(), proto.ProtocolVersion()) + + peerSpawnerImpl := peers.NewPeerSpawner( + parent, + conf.WavesNetwork, + declAddr, + nc.nodeName, + nodeNonce.Uint64(), + proto.ProtocolVersion(), + ) peerStorage, err := peersPersistentStorage.NewCBORStorage(nc.statePath, time.Now()) if err != nil { - zap.S().Errorf("Failed to open or create peers storage: %v", err) - return + return nil, errors.Wrap(err, "failed to open or create peers storage") } if nc.dropPeers { if err := peerStorage.DropStorage(); err != nil { - zap.S().Errorf("Failed to drop peers storage. Drop peers storage manually. Err: %v", err) - return + return nil, errors.Wrap(err, "failed to drop peers storage (drop peers storage manually)") } zap.S().Info("Successfully dropped peers storage") } - - peerManager := peers.NewPeerManager( + return peers.NewPeerManager( peerSpawnerImpl, peerStorage, int(nc.limitAllConnections/2), @@ -448,34 +732,29 @@ func main() { !nc.disableOutgoingConnections, nc.newConnectionsLimit, nc.blackListResidenceTime, - ) - go peerManager.Run(ctx) + ), nil +} - var minerScheduler Scheduler - if nc.disableMiner { - minerScheduler = scheduler.DisabledScheduler{} - } else { - minerScheduler, err = scheduler.NewScheduler( - st, - wal, - cfg, - ntpTime, - scheduler.NewMinerConsensus(peerManager, nc.minPeersMining), - nc.obsolescencePeriod, - ) - if err != nil { - zap.S().Errorf("Failed to initialize miner scheduler: %v", err) - return - } +func createServices( + nc *config, + st state.State, + wal types.EmbeddedWallet, + cfg *settings.BlockchainSettings, + ntpTime types.Time, + peerManager peers.PeerManager, + parent peer.Parent, + scheduler Scheduler, +) (services.Services, error) { + utxValidator, err := utxpool.NewValidator(st, ntpTime, nc.obsolescencePeriod) + if err != nil { + return services.Services{}, errors.Wrap(err, "failed to initialize UTX") } - blockApplier := blocks_applier.NewBlocksApplier() - - svs := services.Services{ + return services.Services{ State: st, Peers: peerManager, - Scheduler: minerScheduler, - BlocksApplier: blockApplier, - UtxPool: utx, + Scheduler: scheduler, + BlocksApplier: blocks_applier.NewBlocksApplier(), + UtxPool: utxpool.New(utxPoolMaxSizeBytes, utxValidator, cfg), Scheme: cfg.AddressSchemeCharacter, Time: ntpTime, Wallet: wal, @@ -483,82 +762,30 @@ func main() { InternalChannel: messages.NewInternalChannel(), MinPeersMining: nc.minPeersMining, SkipMessageList: parent.SkipMessageList, - } - - mine := miner.NewMicroblockMiner(svs, features, reward) - go miner.Run(ctx, mine, minerScheduler, svs.InternalChannel) - - ntw, networkInfoCh := network.NewNetwork(svs, parent, nc.obsolescencePeriod) - go ntw.Run(ctx) - - n := node.NewNode(svs, declAddr, bindAddr, nc.microblockInterval, nc.enableLightMode) - go n.Run(ctx, parent, svs.InternalChannel, networkInfoCh, ntw.SyncPeer()) + }, nil +} - go minerScheduler.Reschedule() - - if len(conf.Addresses) > 0 { - addresses := strings.Split(conf.Addresses, ",") - for _, addr := range addresses { - tcpAddr := proto.NewTCPAddrFromString(addr) - if tcpAddr.Empty() { - // That means that configuration parameter is invalid - zap.S().Errorf("Failed to parse TCPAddr from string %q", tcpAddr.String()) - return - } - if pErr := peerManager.AddAddress(ctx, tcpAddr); pErr != nil { - // That means that we have problems with peers storage - zap.S().Errorf("Failed to add addres into know peers storage: %v", pErr) - return - } +func runAPIs( + ctx context.Context, + nc *config, + conf *settings.NodeSettings, + app *api.App, + svs services.Services, +) error { + if nc.enableGrpcAPI { + if sErr := runGRPCServer(ctx, conf.GrpcAddr, nc, svs); sErr != nil { + return errors.Wrap(sErr, "failed to run gRPC server") } } - app, err := api.NewApp(nc.apiKey, minerScheduler, svs) - if err != nil { - zap.S().Errorf("Failed to initialize application: %v", err) - return - } - - webApi := api.NewNodeApi(app, st, n) + webAPI := api.NewNodeAPI(app, svs.State) go func() { zap.S().Infof("Starting node HTTP API on '%v'", conf.HttpAddr) - if runErr := api.Run(ctx, conf.HttpAddr, webApi, apiRunOptsFromCLIFlags(nc)); runErr != nil { + if runErr := api.Run(ctx, conf.HttpAddr, webAPI, apiRunOptsFromCLIFlags(nc)); runErr != nil { zap.S().Errorf("Failed to start API: %v", runErr) } }() - - go func() { - if nc.prometheus != "" { - h := http.NewServeMux() - h.Handle("/metrics", promhttp.Handler()) - s := &http.Server{ - Addr: nc.prometheus, - Handler: h, - ReadHeaderTimeout: defaultTimeout, - ReadTimeout: defaultTimeout, - } - zap.S().Infof("Starting node metrics endpoint on '%v'", nc.prometheus) - _ = s.ListenAndServe() - } - }() - - if nc.enableGrpcAPI { - srv, srvErr := server.NewServer(svs) - if srvErr != nil { - zap.S().Errorf("Failed to create gRPC server: %v", srvErr) - return - } - go func() { - if runErr := srv.Run(ctx, conf.GrpcAddr, grpcAPIRunOptsFromCLIFlags(nc)); runErr != nil { - zap.S().Errorf("grpcServer.Run(): %v", runErr) - } - }() - } - - <-ctx.Done() - zap.S().Info("User termination in progress...") - n.Close() - <-time.After(1 * time.Second) + return nil } func FromArgs(scheme proto.Scheme, c *config) func(s *settings.NodeSettings) error { diff --git a/pkg/api/node_api.go b/pkg/api/node_api.go index 1759448c6..4c115ac71 100644 --- a/pkg/api/node_api.go +++ b/pkg/api/node_api.go @@ -18,28 +18,26 @@ import ( apiErrs "github.com/wavesplatform/gowaves/pkg/api/errors" "github.com/wavesplatform/gowaves/pkg/crypto" "github.com/wavesplatform/gowaves/pkg/errs" - "github.com/wavesplatform/gowaves/pkg/node" "github.com/wavesplatform/gowaves/pkg/proto" "github.com/wavesplatform/gowaves/pkg/state" "github.com/wavesplatform/gowaves/pkg/util/limit_listener" ) const ( - defaultTimeout = 30 * time.Second - postMessageSizeLimit int64 = 1 << 20 // 1 MB - maxDebugMessageLength = 100 + defaultTimeout = 30 * time.Second + defaultShutdownTimeout = 5 * time.Second + postMessageSizeLimit int64 = 1 << 20 // 1 MB + maxDebugMessageLength = 100 ) type NodeApi struct { state state.State - node *node.Node app *App } -func NewNodeApi(app *App, state state.State, node *node.Node) *NodeApi { +func NewNodeAPI(app *App, state state.State) *NodeApi { return &NodeApi{ state: state, - node: node, app: app, } } @@ -415,12 +413,19 @@ func Run(ctx context.Context, address string, n *NodeApi, opts *RunOptions) erro } apiServer := &http.Server{Addr: address, Handler: routes, ReadHeaderTimeout: defaultTimeout, ReadTimeout: defaultTimeout} + apiServer.RegisterOnShutdown(func() { + zap.S().Info("Shutting down API server ...") + }) + done := make(chan struct{}) + defer func() { <-done }() // wait for server shutdown go func() { + defer close(done) <-ctx.Done() - zap.S().Info("Shutting down API...") - err := apiServer.Shutdown(ctx) - if err != nil && !errors.Is(err, context.Canceled) { - zap.S().Errorf("Failed to shutdown API server: %v", err) + shutdownCtx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) + defer cancel() + sErr := apiServer.Shutdown(shutdownCtx) + if sErr != nil { + zap.S().Errorf("Failed to shutdown API server: %v", sErr) } }() @@ -442,7 +447,7 @@ func Run(ctx context.Context, address string, n *NodeApi, opts *RunOptions) erro err = apiServer.ListenAndServe() } - if err != nil && err != http.ErrServerClosed { + if err != nil && !errors.Is(err, http.ErrServerClosed) { return err } return nil diff --git a/pkg/miner/features.go b/pkg/miner/features.go index ca679abb7..a5cbd6f3b 100644 --- a/pkg/miner/features.go +++ b/pkg/miner/features.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/pkg/errors" + "github.com/wavesplatform/gowaves/pkg/settings" ) @@ -42,13 +43,6 @@ func parseFeature(s string) (settings.Feature, error) { return settings.Feature(u), nil } -func ParseReward(s string) (int64, error) { - if s == "" { - return 0, nil - } - return strconv.ParseInt(s, 10, 64) -} - type featureState interface { IsActivated(featureID int16) (bool, error) IsApproved(featureID int16) (bool, error) diff --git a/pkg/miner/features_test.go b/pkg/miner/features_test.go index 95aea0b73..2f72c67c8 100644 --- a/pkg/miner/features_test.go +++ b/pkg/miner/features_test.go @@ -29,13 +29,3 @@ func TestParseVoteFeaturesFailure(t *testing.T) { _, err := ParseVoteFeatures(s2) require.Error(t, err) } - -func TestParseReward(t *testing.T) { - rs, err := ParseReward("") - require.NoError(t, err) - require.EqualValues(t, 0, rs) - - rs, err = ParseReward("100500") - require.NoError(t, err) - require.EqualValues(t, 100500, rs) -} diff --git a/pkg/mock/peer_manager.go b/pkg/mock/peer_manager.go index 030e8ce5d..3c66392f0 100644 --- a/pkg/mock/peer_manager.go +++ b/pkg/mock/peer_manager.go @@ -122,9 +122,11 @@ func (mr *MockPeerManagerMockRecorder) ClearBlackList() *gomock.Call { } // Close mocks base method. -func (m *MockPeerManager) Close() { +func (m *MockPeerManager) Close() error { m.ctrl.T.Helper() - m.ctrl.Call(m, "Close") + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 } // Close indicates an expected call of Close. diff --git a/pkg/node/fsm/halt_state.go b/pkg/node/fsm/halt_state.go index 2d2722a76..bff059a6c 100644 --- a/pkg/node/fsm/halt_state.go +++ b/pkg/node/fsm/halt_state.go @@ -2,7 +2,9 @@ package fsm import ( "context" + stderrs "errors" + "github.com/pkg/errors" "github.com/qmuntal/stateless" "go.uber.org/zap" @@ -24,17 +26,20 @@ func (a *HaltState) Errorf(err error) error { func newHaltState(info BaseInfo) (State, Async, error) { zap.S().Named(logging.FSMNamespace).Debugf("[Halt] Entered the Halt state") - info.peers.Close() + var errs []error + if err := info.peers.Close(); err != nil { + errs = append(errs, errors.Wrap(err, "failed to close peers")) + } zap.S().Named(logging.FSMNamespace).Debugf("[Halt] Peers closed") err := info.storage.Close() if err != nil { - return nil, nil, err + errs = append(errs, errors.Wrap(err, "failed to close storage")) } zap.S().Named(logging.FSMNamespace).Debugf("[Halt] Storage closed") info.syncPeer.Clear() return &HaltState{ baseInfo: info, - }, nil, nil + }, nil, stderrs.Join(errs...) } func initHaltStateInFSM(_ *StateData, fsm *stateless.StateMachine, info BaseInfo) { diff --git a/pkg/node/node.go b/pkg/node/node.go index dfc73b6dd..f7dad4059 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -68,10 +68,11 @@ func NewNode( } } -func (a *Node) Close() { +func (a *Node) Close() error { ch := make(chan struct{}) a.services.InternalChannel <- messages.NewHaltMessage(ch) <-ch + return nil } func (a *Node) SpawnOutgoingConnections(ctx context.Context) { diff --git a/pkg/node/peers/peer_manager.go b/pkg/node/peers/peer_manager.go index b0942c6cf..c5c98b317 100644 --- a/pkg/node/peers/peer_manager.go +++ b/pkg/node/peers/peer_manager.go @@ -47,7 +47,7 @@ type PeerManager interface { UpdateScore(p peer.Peer, score *proto.Score) error KnownPeers() []storage.KnownPeer UpdateKnownPeers([]storage.KnownPeer) error - Close() + Close() error SpawnOutgoingConnections(context.Context) SpawnIncomingConnection(ctx context.Context, conn net.Conn) error Spawned() []proto.IpPort @@ -246,7 +246,7 @@ func (a *PeerManagerImpl) UpdateKnownPeers(known []storage.KnownPeer) error { return nil } -func (a *PeerManagerImpl) Close() { +func (a *PeerManagerImpl) Close() error { a.mu.Lock() defer a.mu.Unlock() @@ -255,6 +255,7 @@ func (a *PeerManagerImpl) Close() { _ = info.peer.Close() }, ) + return nil } func (a *PeerManagerImpl) SpawnOutgoingConnections(ctx context.Context) { From 1381003218d7b873fdbeb05b224c775d6de4e469 Mon Sep 17 00:00:00 2001 From: Alexey Kiselev Date: Thu, 31 Oct 2024 16:15:44 +0400 Subject: [PATCH 8/9] =?UTF-8?q?Flag=20to=20validate=20transactions=20after?= =?UTF-8?q?=20deserialization=20added=20to=20convert=20=E2=80=A6=20(#1525)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Flag to validate transactions after deserialization added to convert utility. * Linter issues fixed. --------- Co-authored-by: Nikolay Eskov --- cmd/convert/config.go | 18 +++++++++++------- cmd/convert/main.go | 11 +++++++++++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/cmd/convert/config.go b/cmd/convert/config.go index af6f9ffc1..6c6742dc9 100644 --- a/cmd/convert/config.go +++ b/cmd/convert/config.go @@ -20,22 +20,26 @@ type config struct { toJSON bool toBinary bool base64 bool + validate bool } func (c *config) parse() error { var ( scheme, privateKey, in, out string ) - flag.StringVar(&scheme, "scheme", "W", "Network scheme byte. Defaults to 'W' (MainNet).") + flag.StringVar(&scheme, "scheme", "W", "Specifies the network scheme byte. Defaults to 'W' (MainNet).") flag.BoolVar(&c.toJSON, "to-json", false, - "Convert the transaction to JSON representation. Sign the transaction if a private key is provided.") + "Converts the transaction to JSON format. Signs the transaction if a private key is provided.") flag.BoolVar(&c.toBinary, "to-binary", false, - "Convert the transaction to binary representation. Sign the transaction if a private key is provided.") - flag.BoolVar(&c.base64, "base64", false, "Use Base64 as the binary transaction encoding.") + "Converts the transaction to binary format. Signs the transaction if a private key is provided.") + flag.BoolVar(&c.base64, "base64", false, "Encodes the binary transaction in Base64.") flag.StringVar(&privateKey, "private-key", "", - "Private key to sign the transaction. Please provide the key in Base58 string.") - flag.StringVar(&in, "in", "", "Input file path. Defaults to empty string. If empty, reads from STDIN.") - flag.StringVar(&out, "out", "", "Output file path. Defaults to empty string. If empty, writes to STDOUT.") + "Private key for signing the transaction. Provide the key as a Base58 string.") + flag.StringVar(&in, "in", "", + "Specifies the input file path. Defaults to an empty string. If empty, reads from STDIN.") + flag.StringVar(&out, "out", "", + "Specifies the output file path. Defaults to an empty string. If empty, writes to STDOUT.") + flag.BoolVar(&c.validate, "validate", false, "Validates the transaction after deserialization.") flag.Parse() if len(scheme) != 1 { diff --git a/cmd/convert/main.go b/cmd/convert/main.go index 763af3f06..d8343fcbf 100644 --- a/cmd/convert/main.go +++ b/cmd/convert/main.go @@ -44,6 +44,17 @@ func handleJSON(data []byte, cfg config) error { if rErr != nil { return rErr } + + if cfg.validate { + vp := proto.TransactionValidationParams{ + Scheme: cfg.scheme, + CheckVersion: true, + } + _, err := tx.Validate(vp) + if err != nil { + return err + } + } tx, sErr := sign(tx, cfg) if sErr != nil { return sErr From ee55592d9e7c5685d5b61559f46b3ea9751ba786 Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Fri, 1 Nov 2024 21:31:27 +0300 Subject: [PATCH 9/9] Refactored a bit main function of 'importer' utility. (#1529) --- cmd/importer/importer.go | 43 ++++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/cmd/importer/importer.go b/cmd/importer/importer.go index 5f15c4723..fca0f5fc8 100644 --- a/cmd/importer/importer.go +++ b/cmd/importer/importer.go @@ -32,11 +32,26 @@ const ( ) func main() { - err := run() + os.Exit(realMain()) // for more info see https://github.com/golang/go/issues/42078 +} + +func realMain() int { + c := parseFlags() + + logSync := c.setupLogger() + defer logSync() + + if err := c.validateFlags(); err != nil { + zap.S().Error(capitalize(err.Error())) + return 1 + } + + err := runImporter(&c) if err != nil { zap.S().Error(capitalize(err.Error())) - os.Exit(1) + return 1 } + return 0 } type cfg struct { @@ -58,7 +73,7 @@ type cfg struct { disableBloomFilter bool } -func parseFlags() (cfg, error) { +func parseFlags() cfg { const ( defaultBlocksNumber = 1000 defaultBufferSize = 16 @@ -92,18 +107,20 @@ func parseFlags() (cfg, error) { flag.BoolVar(&c.disableBloomFilter, "disable-bloom", false, "Disable bloom filter. Less memory usage, but decrease performance.") flag.Parse() + return c +} +func (c *cfg) validateFlags() error { if c.blockchainPath == "" { - return cfg{}, errors.New("option blockchain-path is not specified, please specify it") + return errors.New("option blockchain-path is not specified, please specify it") } if c.dataDirPath == "" { - return cfg{}, errors.New("option data-path is not specified, please specify it") + return errors.New("option data-path is not specified, please specify it") } if c.lightNodeMode && c.snapshotsPath == "" { - return cfg{}, errors.New("option snapshots-path is not specified in light mode, please specify it") + return errors.New("option snapshots-path is not specified in light mode, please specify it") } - - return c, nil + return nil } func (c *cfg) params(maxFDs int) state.StateParams { @@ -147,15 +164,7 @@ func (c *cfg) setupCPUProfile() (func(), error) { }, nil } -func run() error { - c, err := parseFlags() - if err != nil { - return err - } - - logSync := c.setupLogger() - defer logSync() - +func runImporter(c *cfg) error { zap.S().Infof("Gowaves Importer version: %s", versioning.Version) fds, err := riseFDLimit()