Skip to content

Commit

Permalink
Fix: teztnets relaunch (#990)
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky authored Sep 5, 2023
1 parent b941685 commit ad164ba
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 54 deletions.
18 changes: 0 additions & 18 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}/cmd/api/main.go",
"args": [
"-f",
"config.yml",
"-f",
"config.dev.yml"
],
"envFile": "${workspaceFolder}/.env"
},
{
Expand All @@ -25,12 +19,6 @@
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}/cmd/indexer/main.go",
"args": [
"-f",
"config.yml",
"-f",
"config.dev.yml"
],
"envFile": "${workspaceFolder}/.env"
},
{
Expand All @@ -39,12 +27,6 @@
"request": "launch",
"mode": "debug",
"program": "${workspaceFolder}/scripts/migration/main.go",
"args": [
"-f",
"config.yml",
"-f",
"config.dev.yml"
],
"envFile": "${workspaceFolder}/.env"
}
]
Expand Down
5 changes: 3 additions & 2 deletions cmd/indexer/indexer/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (

"github.com/baking-bad/bcdhub/internal/bcd/tezerrors"
"github.com/baking-bad/bcdhub/internal/config"
"github.com/dipdup-io/workerpool"
"github.com/rs/zerolog/log"
)

// CreateIndexers -
func CreateIndexers(ctx context.Context, cfg config.Config) ([]Indexer, error) {
func CreateIndexers(ctx context.Context, cfg config.Config, g workerpool.Group) ([]Indexer, error) {
if err := tezerrors.LoadErrorDescriptions(); err != nil {
return nil, err
}
Expand All @@ -27,7 +28,7 @@ func CreateIndexers(ctx context.Context, cfg config.Config) ([]Indexer, error) {
defer wg.Done()

if indexerCfg.Periodic != nil {
periodicIndexer, err := NewPeriodicIndexer(ctx, network, cfg, indexerCfg)
periodicIndexer, err := NewPeriodicIndexer(ctx, network, cfg, indexerCfg, g)
if err != nil {
log.Err(err).Msg("NewPeriodicIndexer")
return
Expand Down
18 changes: 10 additions & 8 deletions cmd/indexer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/baking-bad/bcdhub/internal/postgres"
"github.com/baking-bad/bcdhub/internal/postgres/core"
"github.com/baking-bad/bcdhub/internal/rollback"
"github.com/dipdup-io/workerpool"
"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
)
Expand All @@ -45,6 +46,8 @@ type BlockchainIndexer struct {

isPeriodic bool
indicesInit sync.Once

g workerpool.Group
}

// NewBlockchainIndexer -
Expand All @@ -69,6 +72,7 @@ func NewBlockchainIndexer(ctx context.Context, cfg config.Config, network string
Network: networkType,
isPeriodic: indexerConfig.Periodic != nil,
refreshTimer: make(chan struct{}, 10),
g: workerpool.NewGroup(),
}

if err := bi.init(ctx, bi.Context.StorageDB); err != nil {
Expand All @@ -80,6 +84,8 @@ func NewBlockchainIndexer(ctx context.Context, cfg config.Config, network string

// Close -
func (bi *BlockchainIndexer) Close() error {
bi.g.Wait()

close(bi.refreshTimer)
if err := bi.receiver.Close(); err != nil {
return nil
Expand Down Expand Up @@ -127,14 +133,11 @@ func (bi *BlockchainIndexer) init(ctx context.Context, db *core.Postgres) error
}

// Start -
func (bi *BlockchainIndexer) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

func (bi *BlockchainIndexer) Start(ctx context.Context) {
localSentry := helpers.GetLocalSentry()
helpers.SetLocalTagSentry(localSentry, "network", bi.Network.String())

wg.Add(1)
go bi.indexBlock(ctx, wg)
bi.g.GoCtx(ctx, bi.indexBlock)

bi.receiver.Start(ctx)

Expand Down Expand Up @@ -194,9 +197,7 @@ func (bi *BlockchainIndexer) setUpdateTicker(seconds int) {
bi.refreshTimer <- struct{}{}
}

func (bi *BlockchainIndexer) indexBlock(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

func (bi *BlockchainIndexer) indexBlock(ctx context.Context) {
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()

Expand Down Expand Up @@ -578,5 +579,6 @@ func (bi *BlockchainIndexer) reinit(ctx context.Context, cfg config.Config, inde
logger.Info().Str("network", bi.Context.Network.String()).Msg("Creating indexer object...")
bi.receiver = NewReceiver(bi.Context.RPC, 20, indexerConfig.ReceiverThreads)

bi.refreshTimer = make(chan struct{}, 10)
return bi.init(ctx, bi.Context.StorageDB)
}
3 changes: 1 addition & 2 deletions cmd/indexer/indexer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package indexer

import (
"context"
"sync"

"github.com/baking-bad/bcdhub/internal/noderpc"
)

// Indexer -
type Indexer interface {
Start(ctx context.Context, wg *sync.WaitGroup)
Start(ctx context.Context)
Index(ctx context.Context, head noderpc.Header) error
Rollback(ctx context.Context) error
Close() error
Expand Down
26 changes: 15 additions & 11 deletions cmd/indexer/indexer/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package indexer
import (
"context"
"errors"
"sync"
"time"

"github.com/baking-bad/bcdhub/internal/config"
"github.com/baking-bad/bcdhub/internal/logger"
"github.com/baking-bad/bcdhub/internal/models/types"
"github.com/baking-bad/bcdhub/internal/noderpc"
"github.com/baking-bad/bcdhub/internal/periodic"
"github.com/dipdup-io/workerpool"
)

// PeriodicIndexer -
Expand All @@ -21,20 +22,25 @@ type PeriodicIndexer struct {
indexerCfg config.IndexerConfig

worker *periodic.Worker

wg *sync.WaitGroup
g workerpool.Group
}

// NewPeriodicIndexer -
func NewPeriodicIndexer(ctx context.Context, network string, cfg config.Config, indexerCfg config.IndexerConfig) (*PeriodicIndexer, error) {
func NewPeriodicIndexer(
ctx context.Context,
network string,
cfg config.Config,
indexerCfg config.IndexerConfig,
g workerpool.Group,
) (*PeriodicIndexer, error) {
if indexerCfg.Periodic == nil {
return nil, errors.New("not periodic")
}

p := &PeriodicIndexer{
cfg: cfg,
indexerCfg: indexerCfg,
wg: new(sync.WaitGroup),
g: g,
}

worker, err := periodic.New(*indexerCfg.Periodic, types.NewNetwork(network), p.handleUrlChanged)
Expand All @@ -60,12 +66,10 @@ func NewPeriodicIndexer(ctx context.Context, network string, cfg config.Config,
}

// Start -
func (p *PeriodicIndexer) Start(ctx context.Context, wg *sync.WaitGroup) {

func (p *PeriodicIndexer) Start(ctx context.Context) {
indexerCtx, indexerCancel := context.WithCancel(ctx)
p.indexerCancel = indexerCancel

p.indexer.Start(indexerCtx, p.wg)
p.indexer.Start(indexerCtx)
}

// Close -
Expand All @@ -87,8 +91,8 @@ func (p *PeriodicIndexer) Rollback(ctx context.Context) error {
}

func (p *PeriodicIndexer) handleUrlChanged(ctx context.Context, network, url string) error {
logger.Warning().Str("network", network).Str("url", url).Msg("cancelling indexer due to URL changing...")
p.indexerCancel()
p.wg.Wait()

if err := p.indexer.Close(); err != nil {
return err
Expand All @@ -102,7 +106,7 @@ func (p *PeriodicIndexer) handleUrlChanged(ctx context.Context, network, url str

indexerCtx, indexerCancel := context.WithCancel(ctx)
p.indexerCancel = indexerCancel
p.indexer.Start(indexerCtx, p.wg)
p.g.GoCtx(indexerCtx, p.indexer.Start)

return nil
}
Expand Down
11 changes: 5 additions & 6 deletions cmd/indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"os"
"os/signal"
"sync"
"syscall"

"github.com/baking-bad/bcdhub/cmd/indexer/indexer"
"github.com/baking-bad/bcdhub/internal/config"
"github.com/baking-bad/bcdhub/internal/helpers"
"github.com/baking-bad/bcdhub/internal/logger"
"github.com/dipdup-io/workerpool"
"github.com/pyroscope-io/client/pyroscope"
)

Expand Down Expand Up @@ -56,10 +56,10 @@ func main() {
}
}

var wg sync.WaitGroup
g := workerpool.NewGroup()
ctx, cancel := context.WithCancel(context.Background())

indexers, err := indexer.CreateIndexers(ctx, cfg)
indexers, err := indexer.CreateIndexers(ctx, cfg, g)
if err != nil {
cancel()
logger.Err(err)
Expand All @@ -71,14 +71,13 @@ func main() {
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)

for i := range indexers {
wg.Add(1)
go indexers[i].Start(ctx, &wg)
g.GoCtx(ctx, indexers[i].Start)
}

<-sigChan
cancel()

wg.Wait()
g.Wait()
for i := range indexers {
if err := indexers[i].Close(); err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/aws/aws-sdk-go v1.44.92
github.com/btcsuite/btcutil v1.0.2
github.com/dipdup-io/workerpool v0.0.3
github.com/dipdup-io/workerpool v0.0.4
github.com/ebellocchia/go-base58 v0.1.0
github.com/fatih/color v1.13.0
github.com/getsentry/sentry-go v0.13.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dipdup-io/workerpool v0.0.3 h1:+cnO0/J0e4UiJ0EBEDpvuhriSDVHlsPminGRU2Il+ZI=
github.com/dipdup-io/workerpool v0.0.3/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA=
github.com/dipdup-io/workerpool v0.0.4 h1:m58fuFY3VIPRc+trWpjw2Lsm4FvIgtjP/4VRe79r+/s=
github.com/dipdup-io/workerpool v0.0.4/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA=
github.com/ebellocchia/go-base58 v0.1.0 h1:0w/ODEfZnOPW5KW0QY/Xpb1fxba/BxQJMUa5iYzpljk=
github.com/ebellocchia/go-base58 v0.1.0/go.mod h1:RHE/6C6Ru6YAH9Tc+A9eHQ6ZKEooLC0jw+YLnpt3CAU=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down
2 changes: 1 addition & 1 deletion internal/models/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
DocProtocol = "protocols"
DocScripts = "scripts"
DocTicketUpdates = "ticket_updates"
DocSmartRollups = "smart_rollups"
DocSmartRollups = "smart_rollup"
)

// AllDocuments - returns all document names
Expand Down
2 changes: 1 addition & 1 deletion internal/models/smart_rollup/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (sr *SmartRollup) GetID() int64 {

// GetIndex -
func (SmartRollup) GetIndex() string {
return "contracts"
return "smart_rollup"
}

// Save -
Expand Down
8 changes: 6 additions & 2 deletions internal/periodic/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (w *Worker) Start(ctx context.Context) {
logger.Error().Err(err).Str("network", w.network.String()).Msg("failed to run cron function")
return
}

w.cron.Start()
}

Expand All @@ -82,6 +83,7 @@ func (w *Worker) handleScheduleEvent(ctx context.Context) func() {
logger.Error().Err(err).Str("network", w.network.String()).Msg("failed to receive periodic network info")
}
if changed {
logger.Info().Msg("rpc url changed")
return
}

Expand All @@ -98,6 +100,7 @@ func (w *Worker) handleScheduleEvent(ctx context.Context) func() {
logger.Error().Err(err).Str("network", w.network.String()).Msg("failed to receive periodic network info")
}
if changed {
logger.Info().Msg("rpc url changed")
return
}
}
Expand All @@ -122,12 +125,13 @@ func (w *Worker) checkNetwork(ctx context.Context) (bool, error) {
}

if w.currentUrl != data.RPCURL {
w.currentUrl = data.RPCURL

if w.currentUrl != "" {
if err := w.handler(ctx, w.network.String(), w.currentUrl); err != nil {
if err := w.handler(ctx, w.network.String(), data.RPCURL); err != nil {
logger.Error().Err(err).Str("network", w.network.String()).Msg("failed to apply new rpc url")
}
}
w.currentUrl = data.RPCURL

logger.Info().Str("network", parts[0]).Str("url", w.currentUrl).Msg("new url was found")
return true, nil
Expand Down

0 comments on commit ad164ba

Please sign in to comment.