From 9e75a78898610232adea68efc256671affe45a61 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 11 Dec 2024 20:04:13 -0800 Subject: [PATCH] Add ILogpoller interface and call NewLogPoller() from NewChain() Note: this will hopefully be renamed to LogPoller later, once we find a better name for the struct. (logPoller, for consistency with evm?) and the PR's this depends on have been merged, to avoid merge conflicts --- pkg/solana/chain.go | 15 +++++++++++++-- pkg/solana/logpoller/log_poller.go | 19 ++++++++++++++++--- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/pkg/solana/chain.go b/pkg/solana/chain.go index 630248aff..a8b7a8fe4 100644 --- a/pkg/solana/chain.go +++ b/pkg/solana/chain.go @@ -28,6 +28,7 @@ import ( mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" "github.com/smartcontractkit/chainlink-solana/pkg/solana/internal" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" "github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor" "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm" ) @@ -37,6 +38,7 @@ type Chain interface { ID() string Config() config.Config + LogPoller() logpoller.ILogPoller TxManager() TxManager // Reader returns a new Reader from the available list of nodes (if there are multiple, it will randomly select one) Reader() (client.Reader, error) @@ -89,6 +91,7 @@ type chain struct { services.StateMachine id string cfg *config.TOMLConfig + lp logpoller.ILogPoller txm *txm.Txm balanceMonitor services.Service lggr logger.Logger @@ -235,6 +238,7 @@ func newChain(id string, cfg *config.TOMLConfig, ks core.Keystore, lggr logger.L clientCache: map[string]*verifiedCachedClient{}, } + var lc internal.Loader[client.Reader] = utils.NewLazyLoad(func() (client.Reader, error) { return ch.getClient() }) var tc internal.Loader[client.ReaderWriter] = utils.NewLazyLoad(func() (client.ReaderWriter, error) { return ch.getClient() }) var bc internal.Loader[monitor.BalanceClient] = utils.NewLazyLoad(func() (monitor.BalanceClient, error) { return ch.getClient() }) @@ -303,10 +307,13 @@ func newChain(id string, cfg *config.TOMLConfig, ks core.Keystore, lggr logger.L return result.Signature(), result.Error() } - tc = internal.NewLoader[client.ReaderWriter](func() (client.ReaderWriter, error) { return ch.multiNode.SelectRPC() }) - bc = internal.NewLoader[monitor.BalanceClient](func() (monitor.BalanceClient, error) { return ch.multiNode.SelectRPC() }) + // TODO: Can we just remove these? They nullify the lazy loaders initialized earlier, don't they? + //lc = internal.NewLoader[client.Reader](func() (client.Reader, error) { return ch.multiNode.SelectRPC() }) + //tc = internal.NewLoader[client.ReaderWriter](func() (client.ReaderWriter, error) { return ch.multiNode.SelectRPC() }) + //bc = internal.NewLoader[monitor.BalanceClient](func() (monitor.BalanceClient, error) { return ch.multiNode.SelectRPC() }) } + ch.lp = logpoller.NewLogPoller(lggr, logpoller.NewORM(ch.ID(), ds, lggr), lc) ch.txm = txm.NewTxm(ch.id, tc, sendTx, cfg, ks, lggr) ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc) return &ch, nil @@ -396,6 +403,10 @@ func (c *chain) Config() config.Config { return c.cfg } +func (c *chain) LogPoller() logpoller.ILogPoller { + return c.lp +} + func (c *chain) TxManager() TxManager { return c.txm } diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index fff6faac8..6efc91910 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/internal" ) var ( @@ -24,11 +25,18 @@ type ORM interface { MarkFilterDeleted(ctx context.Context, id int64) (err error) } +type ILogPoller interface { + Start(context.Context) error + Close() error + RegisterFilter(ctx context.Context, filter Filter) error + UnregisterFilter(ctx context.Context, name string) error +} + type LogPoller struct { services.StateMachine lggr logger.SugaredLogger orm ORM - client client.Reader + client internal.Loader[client.Reader] collector *EncodedLogCollector filters *filters @@ -38,7 +46,7 @@ type LogPoller struct { wg sync.WaitGroup } -func NewLogPoller(lggr logger.SugaredLogger, orm ORM, cl client.Reader) *LogPoller { +func NewLogPoller(lggr logger.SugaredLogger, orm ORM, cl internal.Loader[client.Reader]) ILogPoller { lggr = logger.Sugared(logger.Named(lggr, "LogPoller")) lp := LogPoller{ orm: orm, @@ -46,16 +54,21 @@ func NewLogPoller(lggr logger.SugaredLogger, orm ORM, cl client.Reader) *LogPoll lggr: lggr, filters: newFilters(lggr, orm), } - lp.collector = NewEncodedLogCollector(lp.client, lp, lp.lggr) return &lp } func (lp LogPoller) Process(event ProgramEvent) error { // process stream of events coming from event loader + return nil } func (lp *LogPoller) Start(context.Context) error { + cl, err := lp.client.Get() + if err != nil { + return err + } + lp.collector = NewEncodedLogCollector(cl, lp, lp.lggr) return lp.StartOnce("LogPoller", func() error { lp.wg.Add(2) go lp.run()