From 24dce590b260d50dc05a99080b24f6eee84fc9a1 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 28 Sep 2023 20:48:12 +0200 Subject: [PATCH] Moving observability for LogPoller closer to the database layer (#10674) * Moving observability for LogPoller closer to the database layer * Minor fixes * Post rebase fixes * Post rebase fixes --- core/chains/evm/chain.go | 2 +- core/chains/evm/logpoller/helper_test.go | 2 +- core/chains/evm/logpoller/log_poller.go | 30 +-- .../evm/logpoller/log_poller_internal_test.go | 2 +- core/chains/evm/logpoller/log_poller_test.go | 8 +- core/chains/evm/logpoller/observability.go | 206 +++++++++++++----- .../evm/logpoller/observability_test.go | 69 +++--- core/chains/evm/logpoller/orm.go | 115 +++++++--- core/chains/evm/logpoller/orm_test.go | 70 +++--- 9 files changed, 319 insertions(+), 185 deletions(-) diff --git a/core/chains/evm/chain.go b/core/chains/evm/chain.go index 8704d0c1fc8..b4986ad0c25 100644 --- a/core/chains/evm/chain.go +++ b/core/chains/evm/chain.go @@ -250,7 +250,7 @@ func newChain(ctx context.Context, cfg *evmconfig.ChainScoped, nodes []*toml.Nod if opts.GenLogPoller != nil { logPoller = opts.GenLogPoller(chainID) } else { - logPoller = logpoller.NewObservedLogPoller(logpoller.NewORM(chainID, db, l, cfg.Database()), client, l, cfg.EVM().LogPollInterval(), int64(cfg.EVM().FinalityDepth()), int64(cfg.EVM().LogBackfillBatchSize()), int64(cfg.EVM().RPCDefaultBatchSize()), int64(cfg.EVM().LogKeepBlocksDepth())) + logPoller = logpoller.NewLogPoller(logpoller.NewObservedORM(chainID, db, l, cfg.Database()), client, l, cfg.EVM().LogPollInterval(), int64(cfg.EVM().FinalityDepth()), int64(cfg.EVM().LogBackfillBatchSize()), int64(cfg.EVM().RPCDefaultBatchSize()), int64(cfg.EVM().LogKeepBlocksDepth())) } } diff --git a/core/chains/evm/logpoller/helper_test.go b/core/chains/evm/logpoller/helper_test.go index 4c764d7d74f..447a4673588 100644 --- a/core/chains/evm/logpoller/helper_test.go +++ b/core/chains/evm/logpoller/helper_test.go @@ -36,7 +36,7 @@ type TestHarness struct { Lggr logger.Logger // Chain2/ORM2 is just a dummy second chain, doesn't have a client. ChainID, ChainID2 *big.Int - ORM, ORM2 *logpoller.ORM + ORM, ORM2 *logpoller.DbORM LogPoller logpoller.LogPollerTest Client *backends.SimulatedBackend Owner *bind.TransactOpts diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index b9d1ba2e98a..f4b3a7f4f0d 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -87,7 +87,7 @@ var ( type logPoller struct { utils.StartStopOnce ec Client - orm *ORM + orm ORM lggr logger.Logger pollPeriod time.Duration // poll period set by block production rate finalityDepth int64 // finality depth is taken to mean that block (head - finality) is finalized @@ -119,7 +119,7 @@ type logPoller struct { // // How fast that can be done depends largely on network speed and DB, but even for the fastest // support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency -func NewLogPoller(orm *ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration, +func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepBlocksDepth int64) *logPoller { return &logPoller{ @@ -676,7 +676,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { } lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks) - err = lp.orm.q.WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error { + err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error { return lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithQueryer(tx)) }) if err != nil { @@ -747,7 +747,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren // the canonical set per read. Typically, if an application took action on a log // it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads. // Its also nicely analogous to reading from the chain itself. - err2 = lp.orm.q.WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error { + err2 = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error { // These deletes are bounded by reorg depth, so they are // fast and should not slow down the log readers. err3 := lp.orm.DeleteBlocksAfter(blockAfterLCA.Number, pg.WithQueryer(tx)) @@ -844,7 +844,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int return } lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp.Unix()) - err = lp.orm.q.WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error { + err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error { if err2 := lp.orm.InsertBlock(h, currentBlockNumber, currentBlock.Timestamp, pg.WithQueryer(tx)); err2 != nil { return err2 } @@ -937,11 +937,11 @@ func (lp *logPoller) pruneOldBlocks(ctx context.Context) error { // Logs returns logs matching topics and address (exactly) in the given block range, // which are canonical at time of query. func (lp *logPoller) Logs(start, end int64, eventSig common.Hash, address common.Address, qopts ...pg.QOpt) ([]Log, error) { - return lp.orm.SelectLogsByBlockRangeFilter(start, end, address, eventSig, qopts...) + return lp.orm.SelectLogs(start, end, address, eventSig, qopts...) } func (lp *logPoller) LogsWithSigs(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]Log, error) { - return lp.orm.SelectLogsWithSigsByBlockRangeFilter(start, end, address, eventSigs, qopts...) + return lp.orm.SelectLogsWithSigs(start, end, address, eventSigs, qopts...) } func (lp *logPoller) LogsCreatedAfter(eventSig common.Hash, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { @@ -955,7 +955,7 @@ func (lp *logPoller) IndexedLogs(eventSig common.Hash, address common.Address, t // IndexedLogsByBlockRange finds all the logs that have a topic value in topicValues at index topicIndex within the block range func (lp *logPoller) IndexedLogsByBlockRange(start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error) { - return lp.orm.SelectIndexedLogsByBlockRangeFilter(start, end, address, eventSig, topicIndex, topicValues, qopts...) + return lp.orm.SelectIndexedLogsByBlockRange(start, end, address, eventSig, topicIndex, topicValues, qopts...) } func (lp *logPoller) IndexedLogsCreatedAfter(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { @@ -968,28 +968,28 @@ func (lp *logPoller) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Has // LogsDataWordGreaterThan note index is 0 based. func (lp *logPoller) LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { - return lp.orm.SelectDataWordGreaterThan(address, eventSig, wordIndex, wordValueMin, confs, qopts...) + return lp.orm.SelectLogsDataWordGreaterThan(address, eventSig, wordIndex, wordValueMin, confs, qopts...) } // LogsDataWordRange note index is 0 based. func (lp *logPoller) LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { - return lp.orm.SelectDataWordRange(address, eventSig, wordIndex, wordValueMin, wordValueMax, confs, qopts...) + return lp.orm.SelectLogsDataWordRange(address, eventSig, wordIndex, wordValueMin, wordValueMax, confs, qopts...) } // IndexedLogsTopicGreaterThan finds all the logs that have a topic value greater than topicValueMin at index topicIndex. // Only works for integer topics. func (lp *logPoller) IndexedLogsTopicGreaterThan(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { - return lp.orm.SelectIndexLogsTopicGreaterThan(address, eventSig, topicIndex, topicValueMin, confs, qopts...) + return lp.orm.SelectIndexedLogsTopicGreaterThan(address, eventSig, topicIndex, topicValueMin, confs, qopts...) } // LogsUntilBlockHashDataWordGreaterThan note index is 0 based. // If the blockhash is not found (i.e. a stale fork) it will error. func (lp *logPoller) LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { - return lp.orm.SelectUntilBlockHashDataWordGreaterThan(address, eventSig, wordIndex, wordValueMin, untilBlockHash, qopts...) + return lp.orm.SelectLogsUntilBlockHashDataWordGreaterThan(address, eventSig, wordIndex, wordValueMin, untilBlockHash, qopts...) } func (lp *logPoller) IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { - return lp.orm.SelectIndexLogsTopicRange(address, eventSig, topicIndex, topicValueMin, topicValueMax, confs, qopts...) + return lp.orm.SelectIndexedLogsTopicRange(address, eventSig, topicIndex, topicValueMin, topicValueMax, confs, qopts...) } // LatestBlock returns the latest block the log poller is on. It tracks blocks to be able @@ -1009,7 +1009,7 @@ func (lp *logPoller) BlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, // LatestLogByEventSigWithConfs finds the latest log that has confs number of blocks on top of the log. func (lp *logPoller) LatestLogByEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error) { - return lp.orm.SelectLatestLogEventSigWithConfs(eventSig, address, confs, qopts...) + return lp.orm.SelectLatestLogByEventSigWithConfs(eventSig, address, confs, qopts...) } func (lp *logPoller) LatestLogEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) ([]Log, error) { @@ -1017,7 +1017,7 @@ func (lp *logPoller) LatestLogEventSigsAddrsWithConfs(fromBlock int64, eventSigs } func (lp *logPoller) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) { - return lp.orm.SelectLatestBlockNumberEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...) + return lp.orm.SelectLatestBlockByEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...) } // GetBlocksRange tries to get the specified block numbers from the log pollers diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index 5f1c21a5b81..271d8c2a582 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -31,7 +31,7 @@ var ( ) // Validate that filters stored in log_filters_table match the filters stored in memory -func validateFiltersTable(t *testing.T, lp *logPoller, orm *ORM) { +func validateFiltersTable(t *testing.T, lp *logPoller, orm *DbORM) { filters, err := orm.LoadFilters() require.NoError(t, err) require.Equal(t, len(filters), len(lp.filters)) diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index 3d4218d6ffd..e21fc0f3838 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -43,7 +43,7 @@ func logRuntime(t testing.TB, start time.Time) { t.Log("runtime", time.Since(start)) } -func populateDatabase(t testing.TB, o *logpoller.ORM, chainID *big.Int) (common.Hash, common.Address, common.Address) { +func populateDatabase(t testing.TB, o *logpoller.DbORM, chainID *big.Int) (common.Hash, common.Address, common.Address) { event1 := EmitterABI.Events["Log1"].ID address1 := common.HexToAddress("0x2ab9a2Dc53736b361b72d900CdF9F78F9406fbbb") address2 := common.HexToAddress("0x6E225058950f237371261C985Db6bDe26df2200E") @@ -110,7 +110,7 @@ func TestPopulateLoadedDB(t *testing.T) { func() { defer logRuntime(t, time.Now()) - _, err1 := o.SelectLogsByBlockRangeFilter(750000, 800000, address1, event1) + _, err1 := o.SelectLogs(750000, 800000, address1, event1) require.NoError(t, err1) }() func() { @@ -123,7 +123,7 @@ func TestPopulateLoadedDB(t *testing.T) { require.NoError(t, o.InsertBlock(common.HexToHash("0x10"), 1000000, time.Now())) func() { defer logRuntime(t, time.Now()) - lgs, err1 := o.SelectDataWordRange(address1, event1, 0, logpoller.EvmWord(500000), logpoller.EvmWord(500020), 0) + lgs, err1 := o.SelectLogsDataWordRange(address1, event1, 0, logpoller.EvmWord(500000), logpoller.EvmWord(500020), 0) require.NoError(t, err1) // 10 since every other log is for address1 assert.Equal(t, 10, len(lgs)) @@ -138,7 +138,7 @@ func TestPopulateLoadedDB(t *testing.T) { func() { defer logRuntime(t, time.Now()) - lgs, err1 := o.SelectIndexLogsTopicRange(address1, event1, 1, logpoller.EvmWord(500000), logpoller.EvmWord(500020), 0) + lgs, err1 := o.SelectIndexedLogsTopicRange(address1, event1, 1, logpoller.EvmWord(500000), logpoller.EvmWord(500020), 0) require.NoError(t, err1) assert.Equal(t, 10, len(lgs)) }() diff --git a/core/chains/evm/logpoller/observability.go b/core/chains/evm/logpoller/observability.go index 8dfa6e81d0d..4e0ebe74184 100644 --- a/core/chains/evm/logpoller/observability.go +++ b/core/chains/evm/logpoller/observability.go @@ -1,11 +1,13 @@ package logpoller import ( + "math/big" "time" "github.com/ethereum/go-ethereum/common" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/smartcontractkit/sqlx" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -46,113 +48,199 @@ var ( }, []string{"evmChainID", "query"}) ) -// ObservedLogPoller is a decorator layer for LogPoller, responsible for pushing Prometheus metrics reporting duration and size of result set for some of the queries. -// It doesn't change internal logic, because all calls are delegated to the origin LogPoller -type ObservedLogPoller struct { - LogPoller +// ObservedORM is a decorator layer for ORM used by LogPoller, responsible for pushing Prometheus metrics reporting duration and size of result set for the queries. +// It doesn't change internal logic, because all calls are delegated to the origin ORM +type ObservedORM struct { + ORM queryDuration *prometheus.HistogramVec datasetSize *prometheus.GaugeVec chainId string } -// NewObservedLogPoller creates an observed version of log poller created by NewLogPoller +// NewObservedORM creates an observed version of log poller's ORM created by NewORM // Please see ObservedLogPoller for more details on how latencies are measured -func NewObservedLogPoller(orm *ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration, - finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepBlocksDepth int64) LogPoller { - - return &ObservedLogPoller{ - LogPoller: NewLogPoller(orm, ec, lggr, pollPeriod, finalityDepth, backfillBatchSize, rpcBatchSize, keepBlocksDepth), +func NewObservedORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *ObservedORM { + return &ObservedORM{ + ORM: NewORM(chainID, db, lggr, cfg), queryDuration: lpQueryDuration, datasetSize: lpQueryDataSets, - chainId: orm.chainID.String(), + chainId: chainID.String(), } } -func (o *ObservedLogPoller) LogsCreatedAfter(eventSig common.Hash, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { - return withObservedQueryAndResults(o, "LogsCreatedAfter", func() ([]Log, error) { - return o.LogPoller.LogsCreatedAfter(eventSig, address, after, confs, qopts...) +func (o *ObservedORM) Q() pg.Q { + return o.ORM.Q() +} + +func (o *ObservedORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { + return withObservedExec(o, "InsertLogs", func() error { + return o.ORM.InsertLogs(logs, qopts...) + }) +} + +func (o *ObservedORM) InsertBlock(h common.Hash, n int64, t time.Time, qopts ...pg.QOpt) error { + return withObservedExec(o, "InsertBlock", func() error { + return o.ORM.InsertBlock(h, n, t, qopts...) + }) +} + +func (o *ObservedORM) InsertFilter(filter Filter, qopts ...pg.QOpt) error { + return withObservedExec(o, "InsertFilter", func() error { + return o.ORM.InsertFilter(filter, qopts...) + }) +} + +func (o *ObservedORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) { + return withObservedQuery(o, "LoadFilters", func() (map[string]Filter, error) { + return o.ORM.LoadFilters(qopts...) + }) +} + +func (o *ObservedORM) DeleteFilter(name string, qopts ...pg.QOpt) error { + return withObservedExec(o, "DeleteFilter", func() error { + return o.ORM.DeleteFilter(name, qopts...) + }) +} + +func (o *ObservedORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error { + return withObservedExec(o, "DeleteBlocksAfter", func() error { + return o.ORM.DeleteBlocksAfter(start, qopts...) + }) +} + +func (o *ObservedORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error { + return withObservedExec(o, "DeleteBlocksBefore", func() error { + return o.ORM.DeleteBlocksBefore(end, qopts...) + }) +} + +func (o *ObservedORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error { + return withObservedExec(o, "DeleteLogsAfter", func() error { + return o.ORM.DeleteLogsAfter(start, qopts...) + }) +} + +func (o *ObservedORM) DeleteExpiredLogs(qopts ...pg.QOpt) error { + return withObservedExec(o, "DeleteExpiredLogs", func() error { + return o.ORM.DeleteExpiredLogs(qopts...) + }) +} + +func (o *ObservedORM) SelectBlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, error) { + return withObservedQuery(o, "SelectBlockByNumber", func() (*LogPollerBlock, error) { + return o.ORM.SelectBlockByNumber(n, qopts...) + }) +} + +func (o *ObservedORM) SelectLatestBlock(qopts ...pg.QOpt) (*LogPollerBlock, error) { + return withObservedQuery(o, "SelectLatestBlock", func() (*LogPollerBlock, error) { + return o.ORM.SelectLatestBlock(qopts...) + }) +} + +func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error) { + return withObservedQuery(o, "SelectLatestLogByEventSigWithConfs", func() (*Log, error) { + return o.ORM.SelectLatestLogByEventSigWithConfs(eventSig, address, confs, qopts...) + }) +} + +func (o *ObservedORM) SelectLogsWithSigs(start, end int64, address common.Address, eventSigs []common.Hash, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectLogsWithSigs", func() ([]Log, error) { + return o.ORM.SelectLogsWithSigs(start, end, address, eventSigs, qopts...) }) } -func (o *ObservedLogPoller) LatestLogByEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error) { - return withObservedQuery(o, "LatestLogByEventSigWithConfs", func() (*Log, error) { - return o.LogPoller.LatestLogByEventSigWithConfs(eventSig, address, confs, qopts...) +func (o *ObservedORM) SelectLogsCreatedAfter(address common.Address, eventSig common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectLogsCreatedAfter", func() ([]Log, error) { + return o.ORM.SelectLogsCreatedAfter(address, eventSig, after, confs, qopts...) }) } -func (o *ObservedLogPoller) LatestLogEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) ([]Log, error) { - return withObservedQueryAndResults(o, "LatestLogEventSigsAddrsWithConfs", func() ([]Log, error) { - return o.LogPoller.LatestLogEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...) +func (o *ObservedORM) SelectIndexedLogs(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectIndexedLogs", func() ([]Log, error) { + return o.ORM.SelectIndexedLogs(address, eventSig, topicIndex, topicValues, confs, qopts...) }) } -func (o *ObservedLogPoller) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) { - return withObservedQuery(o, "LatestBlockByEventSigsAddrsWithConfs", func() (int64, error) { - return o.LogPoller.LatestBlockByEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...) +func (o *ObservedORM) SelectIndexedLogsByBlockRange(start, end int64, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectIndexedLogsByBlockRange", func() ([]Log, error) { + return o.ORM.SelectIndexedLogsByBlockRange(start, end, address, eventSig, topicIndex, topicValues, qopts...) }) } -func (o *ObservedLogPoller) IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { - return withObservedQueryAndResults(o, "IndexedLogs", func() ([]Log, error) { - return o.LogPoller.IndexedLogs(eventSig, address, topicIndex, topicValues, confs, qopts...) +func (o *ObservedORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectIndexedLogsCreatedAfter", func() ([]Log, error) { + return o.ORM.SelectIndexedLogsCreatedAfter(address, eventSig, topicIndex, topicValues, after, confs, qopts...) }) } -func (o *ObservedLogPoller) IndexedLogsByBlockRange(start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error) { - return withObservedQueryAndResults(o, "IndexedLogsByBlockRange", func() ([]Log, error) { - return o.LogPoller.IndexedLogsByBlockRange(start, end, eventSig, address, topicIndex, topicValues, qopts...) +func (o *ObservedORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIndex int, address common.Address, startBlock, endBlock int64, confs int, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectIndexedLogsWithSigsExcluding", func() ([]Log, error) { + return o.ORM.SelectIndexedLogsWithSigsExcluding(sigA, sigB, topicIndex, address, startBlock, endBlock, confs, qopts...) }) } -func (o *ObservedLogPoller) IndexedLogsCreatedAfter(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { - return withObservedQueryAndResults(o, "IndexedLogsCreatedAfter", func() ([]Log, error) { - return o.LogPoller.IndexedLogsCreatedAfter(eventSig, address, topicIndex, topicValues, after, confs, qopts...) +func (o *ObservedORM) SelectLogs(start, end int64, address common.Address, eventSig common.Hash, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectLogs", func() ([]Log, error) { + return o.ORM.SelectLogs(start, end, address, eventSig, qopts...) }) } -func (o *ObservedLogPoller) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { +func (o *ObservedORM) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { return withObservedQueryAndResults(o, "IndexedLogsByTxHash", func() ([]Log, error) { - return o.LogPoller.IndexedLogsByTxHash(eventSig, txHash, qopts...) + return o.ORM.SelectIndexedLogsByTxHash(eventSig, txHash, qopts...) }) } -func (o *ObservedLogPoller) IndexedLogsTopicGreaterThan(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { - return withObservedQueryAndResults(o, "IndexedLogsTopicGreaterThan", func() ([]Log, error) { - return o.LogPoller.IndexedLogsTopicGreaterThan(eventSig, address, topicIndex, topicValueMin, confs, qopts...) +func (o *ObservedORM) GetBlocksRange(start uint64, end uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) { + return withObservedQueryAndResults(o, "GetBlocksRange", func() ([]LogPollerBlock, error) { + return o.ORM.GetBlocksRange(start, end, qopts...) }) } -func (o *ObservedLogPoller) IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { - return withObservedQueryAndResults(o, "IndexedLogsTopicRange", func() ([]Log, error) { - return o.LogPoller.IndexedLogsTopicRange(eventSig, address, topicIndex, topicValueMin, topicValueMax, confs, qopts...) +func (o *ObservedORM) SelectLatestLogEventSigsAddrsWithConfs(fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectLatestLogEventSigsAddrsWithConfs", func() ([]Log, error) { + return o.ORM.SelectLatestLogEventSigsAddrsWithConfs(fromBlock, addresses, eventSigs, confs, qopts...) }) } -func (o *ObservedLogPoller) IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs int, qopts ...pg.QOpt) ([]Log, error) { - return withObservedQueryAndResults(o, "IndexedLogsWithSigsExcluding", func() ([]Log, error) { - return o.LogPoller.IndexedLogsWithSigsExcluding(address, eventSigA, eventSigB, topicIndex, fromBlock, toBlock, confs, qopts...) +func (o *ObservedORM) SelectLatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) { + return withObservedQuery(o, "SelectLatestBlockByEventSigsAddrsWithConfs", func() (int64, error) { + return o.ORM.SelectLatestBlockByEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...) }) } -func (o *ObservedLogPoller) LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { - return withObservedQueryAndResults(o, "LogsDataWordRange", func() ([]Log, error) { - return o.LogPoller.LogsDataWordRange(eventSig, address, wordIndex, wordValueMin, wordValueMax, confs, qopts...) +func (o *ObservedORM) SelectLogsDataWordRange(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectLogsDataWordRange", func() ([]Log, error) { + return o.ORM.SelectLogsDataWordRange(address, eventSig, wordIndex, wordValueMin, wordValueMax, confs, qopts...) }) } -func (o *ObservedLogPoller) LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { - return withObservedQueryAndResults(o, "LogsDataWordGreaterThan", func() ([]Log, error) { - return o.LogPoller.LogsDataWordGreaterThan(eventSig, address, wordIndex, wordValueMin, confs, qopts...) +func (o *ObservedORM) SelectLogsDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectLogsDataWordGreaterThan", func() ([]Log, error) { + return o.ORM.SelectLogsDataWordGreaterThan(address, eventSig, wordIndex, wordValueMin, confs, qopts...) }) } -func (o *ObservedLogPoller) LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { - return withObservedQueryAndResults(o, "LogsUntilBlockHashDataWordGreaterThan", func() ([]Log, error) { - return o.LogPoller.LogsUntilBlockHashDataWordGreaterThan(eventSig, address, wordIndex, wordValueMin, untilBlockHash, qopts...) +func (o *ObservedORM) SelectLogsUntilBlockHashDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectLogsUntilBlockHashDataWordGreaterThan", func() ([]Log, error) { + return o.ORM.SelectLogsUntilBlockHashDataWordGreaterThan(address, eventSig, wordIndex, wordValueMin, untilBlockHash, qopts...) }) } -func withObservedQueryAndResults[T any](o *ObservedLogPoller, queryName string, query func() ([]T, error)) ([]T, error) { +func (o *ObservedORM) SelectIndexedLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectIndexedLogsTopicGreaterThan", func() ([]Log, error) { + return o.ORM.SelectIndexedLogsTopicGreaterThan(address, eventSig, topicIndex, topicValueMin, confs, qopts...) + }) +} + +func (o *ObservedORM) SelectIndexedLogsTopicRange(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { + return withObservedQueryAndResults(o, "SelectIndexedLogsTopicRange", func() ([]Log, error) { + return o.ORM.SelectIndexedLogsTopicRange(address, eventSig, topicIndex, topicValueMin, topicValueMax, confs, qopts...) + }) +} + +func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query func() ([]T, error)) ([]T, error) { results, err := withObservedQuery(o, queryName, query) if err == nil { o.datasetSize. @@ -162,7 +250,7 @@ func withObservedQueryAndResults[T any](o *ObservedLogPoller, queryName string, return results, err } -func withObservedQuery[T any](o *ObservedLogPoller, queryName string, query func() (T, error)) (T, error) { +func withObservedQuery[T any](o *ObservedORM, queryName string, query func() (T, error)) (T, error) { queryStarted := time.Now() defer func() { o.queryDuration. @@ -171,3 +259,13 @@ func withObservedQuery[T any](o *ObservedLogPoller, queryName string, query func }() return query() } + +func withObservedExec(o *ObservedORM, query string, exec func() error) error { + queryStarted := time.Now() + defer func() { + o.queryDuration. + WithLabelValues(o.chainId, query). + Observe(float64(time.Since(queryStarted))) + }() + return exec() +} diff --git a/core/chains/evm/logpoller/observability_test.go b/core/chains/evm/logpoller/observability_test.go index 5bd0a772d96..c26b487bbd0 100644 --- a/core/chains/evm/logpoller/observability_test.go +++ b/core/chains/evm/logpoller/observability_test.go @@ -25,20 +25,22 @@ func TestMultipleMetricsArePublished(t *testing.T) { lp := createObservedPollLogger(t, 100) require.Equal(t, 0, testutil.CollectAndCount(lp.queryDuration)) - _, _ = lp.IndexedLogs(common.Hash{}, common.Address{}, 1, []common.Hash{}, 1, pg.WithParentCtx(ctx)) - _, _ = lp.IndexedLogsByBlockRange(0, 1, common.Hash{}, common.Address{}, 1, []common.Hash{}, pg.WithParentCtx(ctx)) - _, _ = lp.IndexedLogsTopicGreaterThan(common.Hash{}, common.Address{}, 1, common.Hash{}, 1, pg.WithParentCtx(ctx)) - _, _ = lp.IndexedLogsTopicRange(common.Hash{}, common.Address{}, 1, common.Hash{}, common.Hash{}, 1, pg.WithParentCtx(ctx)) - _, _ = lp.IndexedLogsWithSigsExcluding(common.Address{}, common.Hash{}, common.Hash{}, 1, 0, 1, 1, pg.WithParentCtx(ctx)) - _, _ = lp.LogsDataWordRange(common.Hash{}, common.Address{}, 0, common.Hash{}, common.Hash{}, 1, pg.WithParentCtx(ctx)) - _, _ = lp.LogsDataWordGreaterThan(common.Hash{}, common.Address{}, 0, common.Hash{}, 1, pg.WithParentCtx(ctx)) - _, _ = lp.LogsCreatedAfter(common.Hash{}, common.Address{}, time.Now(), 0, pg.WithParentCtx(ctx)) - _, _ = lp.LatestLogByEventSigWithConfs(common.Hash{}, common.Address{}, 0, pg.WithParentCtx(ctx)) - _, _ = lp.LatestLogEventSigsAddrsWithConfs(0, []common.Hash{{}}, []common.Address{{}}, 1, pg.WithParentCtx(ctx)) - _, _ = lp.IndexedLogsCreatedAfter(common.Hash{}, common.Address{}, 0, []common.Hash{}, time.Now(), 0, pg.WithParentCtx(ctx)) - _, _ = lp.LogsUntilBlockHashDataWordGreaterThan(common.Hash{}, common.Address{}, 0, common.Hash{}, common.Hash{}, pg.WithParentCtx(ctx)) - - require.Equal(t, 12, testutil.CollectAndCount(lp.queryDuration)) + _, _ = lp.SelectIndexedLogs(common.Address{}, common.Hash{}, 1, []common.Hash{}, 1, pg.WithParentCtx(ctx)) + _, _ = lp.SelectIndexedLogsByBlockRange(0, 1, common.Address{}, common.Hash{}, 1, []common.Hash{}, pg.WithParentCtx(ctx)) + _, _ = lp.SelectIndexedLogsTopicGreaterThan(common.Address{}, common.Hash{}, 1, common.Hash{}, 1, pg.WithParentCtx(ctx)) + _, _ = lp.SelectIndexedLogsTopicRange(common.Address{}, common.Hash{}, 1, common.Hash{}, common.Hash{}, 1, pg.WithParentCtx(ctx)) + _, _ = lp.SelectIndexedLogsWithSigsExcluding(common.Hash{}, common.Hash{}, 1, common.Address{}, 0, 1, 1, pg.WithParentCtx(ctx)) + _, _ = lp.SelectLogsDataWordRange(common.Address{}, common.Hash{}, 0, common.Hash{}, common.Hash{}, 1, pg.WithParentCtx(ctx)) + _, _ = lp.SelectLogsDataWordGreaterThan(common.Address{}, common.Hash{}, 0, common.Hash{}, 1, pg.WithParentCtx(ctx)) + _, _ = lp.SelectLogsCreatedAfter(common.Address{}, common.Hash{}, time.Now(), 0, pg.WithParentCtx(ctx)) + _, _ = lp.SelectLatestLogByEventSigWithConfs(common.Hash{}, common.Address{}, 0, pg.WithParentCtx(ctx)) + _, _ = lp.SelectLatestLogEventSigsAddrsWithConfs(0, []common.Address{{}}, []common.Hash{{}}, 1, pg.WithParentCtx(ctx)) + _, _ = lp.SelectIndexedLogsCreatedAfter(common.Address{}, common.Hash{}, 0, []common.Hash{}, time.Now(), 0, pg.WithParentCtx(ctx)) + _, _ = lp.SelectLogsUntilBlockHashDataWordGreaterThan(common.Address{}, common.Hash{}, 0, common.Hash{}, common.Hash{}, pg.WithParentCtx(ctx)) + _ = lp.InsertLogs([]Log{}, pg.WithParentCtx(ctx)) + _ = lp.InsertBlock(common.Hash{}, 0, time.Now(), pg.WithParentCtx(ctx)) + + require.Equal(t, 14, testutil.CollectAndCount(lp.queryDuration)) require.Equal(t, 10, testutil.CollectAndCount(lp.datasetSize)) resetMetrics(*lp) } @@ -48,31 +50,15 @@ func TestShouldPublishDurationInCaseOfError(t *testing.T) { lp := createObservedPollLogger(t, 200) require.Equal(t, 0, testutil.CollectAndCount(lp.queryDuration)) - _, err := lp.LatestLogByEventSigWithConfs(common.Hash{}, common.Address{}, 0, pg.WithParentCtx(ctx)) + _, err := lp.SelectLatestLogByEventSigWithConfs(common.Hash{}, common.Address{}, 0, pg.WithParentCtx(ctx)) require.Error(t, err) require.Equal(t, 1, testutil.CollectAndCount(lp.queryDuration)) - require.Equal(t, 1, counterFromHistogramByLabels(t, lp.queryDuration, "200", "LatestLogByEventSigWithConfs")) + require.Equal(t, 1, counterFromHistogramByLabels(t, lp.queryDuration, "200", "SelectLatestLogByEventSigWithConfs")) resetMetrics(*lp) } -func TestNotObservedFunctions(t *testing.T) { - ctx := testutils.Context(t) - lp := createObservedPollLogger(t, 300) - require.Equal(t, 0, testutil.CollectAndCount(lp.queryDuration)) - - _, err := lp.Logs(0, 1, common.Hash{}, common.Address{}, pg.WithParentCtx(ctx)) - require.NoError(t, err) - - _, err = lp.LogsWithSigs(0, 1, []common.Hash{{}}, common.Address{}, pg.WithParentCtx(ctx)) - require.NoError(t, err) - - require.Equal(t, 0, testutil.CollectAndCount(lp.queryDuration)) - require.Equal(t, 0, testutil.CollectAndCount(lp.datasetSize)) - resetMetrics(*lp) -} - func TestMetricsAreProperlyPopulatedWithLabels(t *testing.T) { lp := createObservedPollLogger(t, 420) expectedCount := 9 @@ -105,16 +91,23 @@ func TestNotPublishingDatasetSizeInCaseOfError(t *testing.T) { require.Equal(t, 0, counterFromGaugeByLabels(lp.datasetSize, "420", "errorQuery")) } -func createObservedPollLogger(t *testing.T, chainId int64) *ObservedLogPoller { +func TestMetricsAreProperlyPopulatedForWrites(t *testing.T) { + lp := createObservedPollLogger(t, 420) + require.NoError(t, withObservedExec(lp, "execQuery", func() error { return nil })) + require.Error(t, withObservedExec(lp, "execQuery", func() error { return fmt.Errorf("error") })) + + require.Equal(t, 2, counterFromHistogramByLabels(t, lp.queryDuration, "420", "execQuery")) +} + +func createObservedPollLogger(t *testing.T, chainId int64) *ObservedORM { lggr, _ := logger.TestLoggerObserved(t, zapcore.ErrorLevel) db := pgtest.NewSqlxDB(t) - orm := NewORM(big.NewInt(chainId), db, lggr, pgtest.NewQConfig(true)) - return NewObservedLogPoller( - orm, nil, lggr, 1, 1, 1, 1, 1000, - ).(*ObservedLogPoller) + return NewObservedORM( + big.NewInt(chainId), db, lggr, pgtest.NewQConfig(true), + ) } -func resetMetrics(lp ObservedLogPoller) { +func resetMetrics(lp ObservedORM) { lp.queryDuration.Reset() lp.datasetSize.Reset() } diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index b26c4ac7df6..8cb80094c0f 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -16,23 +16,67 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils" ) -type ORM struct { +// ORM represents the persistent data access layer used by the log poller. At this moment, it's a bit leaky abstraction, because +// it exposes some of the database implementation details (e.g. pg.Q). Ideally it should be agnostic and could be applied to any persistence layer. +// What is more, LogPoller should not be aware of the underlying database implementation and delegate all the queries to the ORM. +type ORM interface { + Q() pg.Q + InsertLogs(logs []Log, qopts ...pg.QOpt) error + InsertBlock(h common.Hash, n int64, t time.Time, qopts ...pg.QOpt) error + InsertFilter(filter Filter, qopts ...pg.QOpt) error + + LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) + DeleteFilter(name string, qopts ...pg.QOpt) error + + DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error + DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error + DeleteLogsAfter(start int64, qopts ...pg.QOpt) error + DeleteExpiredLogs(qopts ...pg.QOpt) error + + GetBlocksRange(start uint64, end uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) + SelectBlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, error) + SelectLatestBlock(qopts ...pg.QOpt) (*LogPollerBlock, error) + + SelectLogs(start, end int64, address common.Address, eventSig common.Hash, qopts ...pg.QOpt) ([]Log, error) + SelectLogsWithSigs(start, end int64, address common.Address, eventSigs []common.Hash, qopts ...pg.QOpt) ([]Log, error) + SelectLogsCreatedAfter(address common.Address, eventSig common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) + SelectLatestLogByEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error) + SelectLatestLogEventSigsAddrsWithConfs(fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) + SelectLatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) + + SelectIndexedLogs(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) + SelectIndexedLogsByBlockRange(start, end int64, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error) + SelectIndexedLogsCreatedAfter(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) + SelectIndexedLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) + SelectIndexedLogsTopicRange(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) + SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIndex int, address common.Address, startBlock, endBlock int64, confs int, qopts ...pg.QOpt) ([]Log, error) + SelectIndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) + SelectLogsDataWordRange(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) + SelectLogsDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) + SelectLogsUntilBlockHashDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) +} + +type DbORM struct { chainID *big.Int q pg.Q } -// NewORM creates an ORM scoped to chainID. -func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *ORM { +// NewORM creates a DbORM scoped to chainID. +func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *DbORM { namedLogger := lggr.Named("Configs") q := pg.NewQ(db, namedLogger, cfg) - return &ORM{ + return &DbORM{ chainID: chainID, q: q, } } +func (o *DbORM) Q() pg.Q { + return o.q +} + // InsertBlock is idempotent to support replays. -func (o *ORM) InsertBlock(h common.Hash, n int64, t time.Time, qopts ...pg.QOpt) error { +func (o *DbORM) InsertBlock(h common.Hash, n int64, t time.Time, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) err := q.ExecQ(`INSERT INTO evm.log_poller_blocks (evm_chain_id, block_hash, block_number, block_timestamp, created_at) VALUES ($1, $2, $3, $4, NOW()) ON CONFLICT DO NOTHING`, utils.NewBig(o.chainID), h[:], n, t) @@ -43,7 +87,7 @@ func (o *ORM) InsertBlock(h common.Hash, n int64, t time.Time, qopts ...pg.QOpt) // // Each address/event pair must have a unique job id, so it may be removed when the job is deleted. // If a second job tries to overwrite the same pair, this should fail. -func (o *ORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) { +func (o *DbORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) { q := o.q.WithOpts(qopts...) addresses := make([][]byte, 0) events := make([][]byte, 0) @@ -65,13 +109,13 @@ func (o *ORM) InsertFilter(filter Filter, qopts ...pg.QOpt) (err error) { } // DeleteFilter removes all events,address pairs associated with the Filter -func (o *ORM) DeleteFilter(name string, qopts ...pg.QOpt) error { +func (o *DbORM) DeleteFilter(name string, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) return q.ExecQ(`DELETE FROM evm.log_poller_filters WHERE name = $1 AND evm_chain_id = $2`, name, utils.NewBig(o.chainID)) } // LoadFiltersForChain returns all filters for this chain -func (o *ORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) { +func (o *DbORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) { q := o.q.WithOpts(qopts...) rows := make([]Filter, 0) err := q.Select(&rows, `SELECT name, @@ -88,7 +132,7 @@ func (o *ORM) LoadFilters(qopts ...pg.QOpt) (map[string]Filter, error) { return filters, err } -func (o *ORM) SelectBlockByHash(h common.Hash, qopts ...pg.QOpt) (*LogPollerBlock, error) { +func (o *DbORM) SelectBlockByHash(h common.Hash, qopts ...pg.QOpt) (*LogPollerBlock, error) { q := o.q.WithOpts(qopts...) var b LogPollerBlock if err := q.Get(&b, `SELECT * FROM evm.log_poller_blocks WHERE block_hash = $1 AND evm_chain_id = $2`, h, utils.NewBig(o.chainID)); err != nil { @@ -97,7 +141,7 @@ func (o *ORM) SelectBlockByHash(h common.Hash, qopts ...pg.QOpt) (*LogPollerBloc return &b, nil } -func (o *ORM) SelectBlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, error) { +func (o *DbORM) SelectBlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, error) { q := o.q.WithOpts(qopts...) var b LogPollerBlock if err := q.Get(&b, `SELECT * FROM evm.log_poller_blocks WHERE block_number = $1 AND evm_chain_id = $2`, n, utils.NewBig(o.chainID)); err != nil { @@ -106,7 +150,7 @@ func (o *ORM) SelectBlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, e return &b, nil } -func (o *ORM) SelectLatestBlock(qopts ...pg.QOpt) (*LogPollerBlock, error) { +func (o *DbORM) SelectLatestBlock(qopts ...pg.QOpt) (*LogPollerBlock, error) { q := o.q.WithOpts(qopts...) var b LogPollerBlock if err := q.Get(&b, `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1`, utils.NewBig(o.chainID)); err != nil { @@ -115,7 +159,7 @@ func (o *ORM) SelectLatestBlock(qopts ...pg.QOpt) (*LogPollerBlock, error) { return &b, nil } -func (o *ORM) SelectLatestLogEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error) { +func (o *DbORM) SelectLatestLogByEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error) { q := o.q.WithOpts(qopts...) var l Log if err := q.Get(&l, `SELECT * FROM evm.logs @@ -130,19 +174,19 @@ func (o *ORM) SelectLatestLogEventSigWithConfs(eventSig common.Hash, address com } // DeleteBlocksAfter delete all blocks after and including start. -func (o *ORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error { +func (o *DbORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) return q.ExecQ(`DELETE FROM evm.log_poller_blocks WHERE block_number >= $1 AND evm_chain_id = $2`, start, utils.NewBig(o.chainID)) } // DeleteBlocksBefore delete all blocks before and including end. -func (o *ORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error { +func (o *DbORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) _, err := q.Exec(`DELETE FROM evm.log_poller_blocks WHERE block_number <= $1 AND evm_chain_id = $2`, end, utils.NewBig(o.chainID)) return err } -func (o *ORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error { +func (o *DbORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error { q := o.q.WithOpts(qopts...) return q.ExecQ(`DELETE FROM evm.logs WHERE block_number >= $1 AND evm_chain_id = $2`, start, utils.NewBig(o.chainID)) } @@ -155,7 +199,7 @@ type Exp struct { ShouldDelete bool } -func (o *ORM) DeleteExpiredLogs(qopts ...pg.QOpt) error { +func (o *DbORM) DeleteExpiredLogs(qopts ...pg.QOpt) error { qopts = append(qopts, pg.WithLongQueryTimeout()) q := o.q.WithOpts(qopts...) @@ -170,7 +214,7 @@ func (o *ORM) DeleteExpiredLogs(qopts ...pg.QOpt) error { } // InsertLogs is idempotent to support replays. -func (o *ORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { +func (o *DbORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { for _, log := range logs { if o.chainID.Cmp(log.EvmChainId.ToInt()) != 0 { return errors.Errorf("invalid chainID in log got %v want %v", log.EvmChainId.ToInt(), o.chainID) @@ -203,7 +247,7 @@ func (o *ORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error { return nil } -func (o *ORM) SelectLogsByBlockRange(start, end int64) ([]Log, error) { +func (o *DbORM) SelectLogsByBlockRange(start, end int64) ([]Log, error) { var logs []Log err := o.q.Select(&logs, ` SELECT * FROM evm.logs @@ -216,7 +260,7 @@ func (o *ORM) SelectLogsByBlockRange(start, end int64) ([]Log, error) { } // SelectLogsByBlockRangeFilter finds the logs in a given block range. -func (o *ORM) SelectLogsByBlockRangeFilter(start, end int64, address common.Address, eventSig common.Hash, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectLogs(start, end int64, address common.Address, eventSig common.Hash, qopts ...pg.QOpt) ([]Log, error) { var logs []Log q := o.q.WithOpts(qopts...) err := q.Select(&logs, ` @@ -231,7 +275,7 @@ func (o *ORM) SelectLogsByBlockRangeFilter(start, end int64, address common.Addr } // SelectLogsCreatedAfter finds logs created after some timestamp. -func (o *ORM) SelectLogsCreatedAfter(address common.Address, eventSig common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectLogsCreatedAfter(address common.Address, eventSig common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { minBlock, maxBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...) if err != nil { return nil, err @@ -255,7 +299,7 @@ func (o *ORM) SelectLogsCreatedAfter(address common.Address, eventSig common.Has // SelectLogsWithSigsByBlockRangeFilter finds the logs in the given block range with the given event signatures // emitted from the given address. -func (o *ORM) SelectLogsWithSigsByBlockRangeFilter(start, end int64, address common.Address, eventSigs []common.Hash, qopts ...pg.QOpt) (logs []Log, err error) { +func (o *DbORM) SelectLogsWithSigs(start, end int64, address common.Address, eventSigs []common.Hash, qopts ...pg.QOpt) (logs []Log, err error) { q := o.q.WithOpts(qopts...) sigs := make([][]byte, 0, len(eventSigs)) for _, sig := range eventSigs { @@ -293,7 +337,7 @@ ORDER BY (evm.logs.block_number, evm.logs.log_index)`, a) return logs, err } -func (o *ORM) GetBlocksRange(start uint64, end uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) { +func (o *DbORM) GetBlocksRange(start uint64, end uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) { var blocks []LogPollerBlock q := o.q.WithOpts(qopts...) err := q.Select(&blocks, ` @@ -307,7 +351,7 @@ func (o *ORM) GetBlocksRange(start uint64, end uint64, qopts ...pg.QOpt) ([]LogP } // SelectLatestLogEventSigsAddrsWithConfs finds the latest log by (address, event) combination that matches a list of Addresses and list of events -func (o *ORM) SelectLatestLogEventSigsAddrsWithConfs(fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectLatestLogEventSigsAddrsWithConfs(fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { var logs []Log sigs := concatBytes(eventSigs) addrs := concatBytes(addresses) @@ -332,7 +376,7 @@ func (o *ORM) SelectLatestLogEventSigsAddrsWithConfs(fromBlock int64, addresses } // SelectLatestBlockNumberEventSigsAddrsWithConfs finds the latest block number that matches a list of Addresses and list of events. It returns 0 if there is no matching block -func (o *ORM) SelectLatestBlockNumberEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) { +func (o *DbORM) SelectLatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) { var blockNumber int64 sigs := concatBytes(eventSigs) addrs := concatBytes(addresses) @@ -352,7 +396,7 @@ func (o *ORM) SelectLatestBlockNumberEventSigsAddrsWithConfs(fromBlock int64, ev return blockNumber, nil } -func (o *ORM) SelectDataWordRange(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectLogsDataWordRange(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { var logs []Log q := o.q.WithOpts(qopts...) err := q.Select(&logs, @@ -369,7 +413,7 @@ func (o *ORM) SelectDataWordRange(address common.Address, eventSig common.Hash, return logs, nil } -func (o *ORM) SelectDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectLogsDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { var logs []Log q := o.q.WithOpts(qopts...) err := q.Select(&logs, @@ -385,7 +429,7 @@ func (o *ORM) SelectDataWordGreaterThan(address common.Address, eventSig common. return logs, nil } -func (o *ORM) SelectIndexLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectIndexedLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { if err := validateTopicIndex(topicIndex); err != nil { return nil, err } @@ -405,7 +449,7 @@ func (o *ORM) SelectIndexLogsTopicGreaterThan(address common.Address, eventSig c return logs, nil } -func (o *ORM) SelectUntilBlockHashDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectLogsUntilBlockHashDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { var logs []Log q := o.q.WithOpts(qopts...) err := q.Transaction(func(tx pg.Queryer) error { @@ -430,7 +474,7 @@ func (o *ORM) SelectUntilBlockHashDataWordGreaterThan(address common.Address, ev return logs, nil } -func (o *ORM) SelectIndexLogsTopicRange(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectIndexedLogsTopicRange(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { if err := validateTopicIndex(topicIndex); err != nil { return nil, err } @@ -451,7 +495,7 @@ func (o *ORM) SelectIndexLogsTopicRange(address common.Address, eventSig common. return logs, nil } -func (o *ORM) SelectIndexedLogs(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectIndexedLogs(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) { if err := validateTopicIndex(topicIndex); err != nil { return nil, err } @@ -474,7 +518,7 @@ func (o *ORM) SelectIndexedLogs(address common.Address, eventSig common.Hash, to } // SelectIndexedLogsByBlockRangeFilter finds the indexed logs in a given block range. -func (o *ORM) SelectIndexedLogsByBlockRangeFilter(start, end int64, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectIndexedLogsByBlockRange(start, end int64, address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error) { if err := validateTopicIndex(topicIndex); err != nil { return nil, err } @@ -502,12 +546,11 @@ func validateTopicIndex(index int) error { return nil } -func (o *ORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) { minBlock, maxBlock, err := o.blocksRangeAfterTimestamp(after, confs, qopts...) if err != nil { return nil, err } - var logs []Log q := o.q.WithOpts(qopts...) topicValuesBytes := concatBytes(topicValues) @@ -527,7 +570,7 @@ func (o *ORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig com return logs, nil } -func (o *ORM) SelectIndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectIndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) { q := o.q.WithOpts(qopts...) var logs []Log err := q.Select(&logs, ` @@ -544,7 +587,7 @@ func (o *ORM) SelectIndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash } // SelectIndexedLogsWithSigsExcluding query's for logs that have signature A and exclude logs that have a corresponding signature B, matching is done based on the topic index both logs should be inside the block range and have the minimum number of confirmations -func (o *ORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIndex int, address common.Address, startBlock, endBlock int64, confs int, qopts ...pg.QOpt) ([]Log, error) { +func (o *DbORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIndex int, address common.Address, startBlock, endBlock int64, confs int, qopts ...pg.QOpt) ([]Log, error) { if err := validateTopicIndex(topicIndex); err != nil { return nil, err } @@ -582,7 +625,7 @@ func (o *ORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIn return logs, nil } -func (o *ORM) blocksRangeAfterTimestamp(after time.Time, confs int, qopts ...pg.QOpt) (int64, int64, error) { +func (o *DbORM) blocksRangeAfterTimestamp(after time.Time, confs int, qopts ...pg.QOpt) (int64, int64, error) { type blockRange struct { MinBlockNumber int64 `db:"min_block"` MaxBlockNumber int64 `db:"max_block"` diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index f9b51000bb1..531aec2ff30 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -293,36 +293,36 @@ func TestORM(t *testing.T) { require.Equal(t, 1, len(logs)) assert.Equal(t, []byte("hello"), logs[0].Data) - logs, err = o1.SelectLogsByBlockRangeFilter(1, 1, common.HexToAddress("0x1234"), topic) + logs, err = o1.SelectLogs(1, 1, common.HexToAddress("0x1234"), topic) require.NoError(t, err) assert.Equal(t, 0, len(logs)) - logs, err = o1.SelectLogsByBlockRangeFilter(10, 10, common.HexToAddress("0x1234"), topic) + logs, err = o1.SelectLogs(10, 10, common.HexToAddress("0x1234"), topic) require.NoError(t, err) require.Equal(t, 1, len(logs)) // With no blocks, should be an error - _, err = o1.SelectLatestLogEventSigWithConfs(topic, common.HexToAddress("0x1234"), 0) + _, err = o1.SelectLatestLogByEventSigWithConfs(topic, common.HexToAddress("0x1234"), 0) require.Error(t, err) assert.True(t, errors.Is(err, sql.ErrNoRows)) // With block 10, only 0 confs should work require.NoError(t, o1.InsertBlock(common.HexToHash("0x1234"), 10, time.Now())) - log, err := o1.SelectLatestLogEventSigWithConfs(topic, common.HexToAddress("0x1234"), 0) + log, err := o1.SelectLatestLogByEventSigWithConfs(topic, common.HexToAddress("0x1234"), 0) require.NoError(t, err) assert.Equal(t, int64(10), log.BlockNumber) - _, err = o1.SelectLatestLogEventSigWithConfs(topic, common.HexToAddress("0x1234"), 1) + _, err = o1.SelectLatestLogByEventSigWithConfs(topic, common.HexToAddress("0x1234"), 1) require.Error(t, err) assert.True(t, errors.Is(err, sql.ErrNoRows)) // With block 12, anything <=2 should work require.NoError(t, o1.DeleteBlocksAfter(10)) require.NoError(t, o1.InsertBlock(common.HexToHash("0x1234"), 11, time.Now())) require.NoError(t, o1.InsertBlock(common.HexToHash("0x1235"), 12, time.Now())) - _, err = o1.SelectLatestLogEventSigWithConfs(topic, common.HexToAddress("0x1234"), 0) + _, err = o1.SelectLatestLogByEventSigWithConfs(topic, common.HexToAddress("0x1234"), 0) require.NoError(t, err) - _, err = o1.SelectLatestLogEventSigWithConfs(topic, common.HexToAddress("0x1234"), 1) + _, err = o1.SelectLatestLogByEventSigWithConfs(topic, common.HexToAddress("0x1234"), 1) require.NoError(t, err) - _, err = o1.SelectLatestLogEventSigWithConfs(topic, common.HexToAddress("0x1234"), 2) + _, err = o1.SelectLatestLogByEventSigWithConfs(topic, common.HexToAddress("0x1234"), 2) require.NoError(t, err) - _, err = o1.SelectLatestLogEventSigWithConfs(topic, common.HexToAddress("0x1234"), 3) + _, err = o1.SelectLatestLogByEventSigWithConfs(topic, common.HexToAddress("0x1234"), 3) require.Error(t, err) assert.True(t, errors.Is(err, sql.ErrNoRows)) @@ -423,7 +423,7 @@ func TestORM(t *testing.T) { require.Zero(t, len(logs)) } -func insertLogsTopicValueRange(t *testing.T, chainID *big.Int, o *logpoller.ORM, addr common.Address, blockNumber int, eventSig common.Hash, start, stop int) { +func insertLogsTopicValueRange(t *testing.T, chainID *big.Int, o *logpoller.DbORM, addr common.Address, blockNumber int, eventSig common.Hash, start, stop int) { var lgs []logpoller.Log for i := start; i <= stop; i++ { lgs = append(lgs, logpoller.Log{ @@ -459,45 +459,45 @@ func TestORM_IndexedLogs(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - lgs, err = o1.SelectIndexedLogsByBlockRangeFilter(1, 1, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(1)}) + lgs, err = o1.SelectIndexedLogsByBlockRange(1, 1, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(1)}) require.NoError(t, err) assert.Equal(t, 1, len(lgs)) - lgs, err = o1.SelectIndexedLogsByBlockRangeFilter(1, 2, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(2)}) + lgs, err = o1.SelectIndexedLogsByBlockRange(1, 2, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(2)}) require.NoError(t, err) assert.Equal(t, 1, len(lgs)) - lgs, err = o1.SelectIndexedLogsByBlockRangeFilter(1, 2, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(1)}) + lgs, err = o1.SelectIndexedLogsByBlockRange(1, 2, addr, eventSig, 1, []common.Hash{logpoller.EvmWord(1)}) require.NoError(t, err) assert.Equal(t, 1, len(lgs)) - _, err = o1.SelectIndexedLogsByBlockRangeFilter(1, 2, addr, eventSig, 0, []common.Hash{logpoller.EvmWord(1)}) + _, err = o1.SelectIndexedLogsByBlockRange(1, 2, addr, eventSig, 0, []common.Hash{logpoller.EvmWord(1)}) require.Error(t, err) assert.Contains(t, err.Error(), "invalid index for topic: 0") - _, err = o1.SelectIndexedLogsByBlockRangeFilter(1, 2, addr, eventSig, 4, []common.Hash{logpoller.EvmWord(1)}) + _, err = o1.SelectIndexedLogsByBlockRange(1, 2, addr, eventSig, 4, []common.Hash{logpoller.EvmWord(1)}) require.Error(t, err) assert.Contains(t, err.Error(), "invalid index for topic: 4") - lgs, err = o1.SelectIndexLogsTopicGreaterThan(addr, eventSig, 1, logpoller.EvmWord(2), 0) + lgs, err = o1.SelectIndexedLogsTopicGreaterThan(addr, eventSig, 1, logpoller.EvmWord(2), 0) require.NoError(t, err) assert.Equal(t, 2, len(lgs)) - lgs, err = o1.SelectIndexLogsTopicRange(addr, eventSig, 1, logpoller.EvmWord(3), logpoller.EvmWord(3), 0) + lgs, err = o1.SelectIndexedLogsTopicRange(addr, eventSig, 1, logpoller.EvmWord(3), logpoller.EvmWord(3), 0) require.NoError(t, err) assert.Equal(t, 1, len(lgs)) assert.Equal(t, logpoller.EvmWord(3).Bytes(), lgs[0].GetTopics()[1].Bytes()) - lgs, err = o1.SelectIndexLogsTopicRange(addr, eventSig, 1, logpoller.EvmWord(1), logpoller.EvmWord(3), 0) + lgs, err = o1.SelectIndexedLogsTopicRange(addr, eventSig, 1, logpoller.EvmWord(1), logpoller.EvmWord(3), 0) require.NoError(t, err) assert.Equal(t, 3, len(lgs)) // Check confirmations work as expected. require.NoError(t, o1.InsertBlock(common.HexToHash("0x2"), 2, time.Now())) - lgs, err = o1.SelectIndexLogsTopicRange(addr, eventSig, 1, logpoller.EvmWord(4), logpoller.EvmWord(4), 1) + lgs, err = o1.SelectIndexedLogsTopicRange(addr, eventSig, 1, logpoller.EvmWord(4), logpoller.EvmWord(4), 1) require.NoError(t, err) assert.Equal(t, 0, len(lgs)) require.NoError(t, o1.InsertBlock(common.HexToHash("0x3"), 3, time.Now())) - lgs, err = o1.SelectIndexLogsTopicRange(addr, eventSig, 1, logpoller.EvmWord(4), logpoller.EvmWord(4), 1) + lgs, err = o1.SelectIndexedLogsTopicRange(addr, eventSig, 1, logpoller.EvmWord(4), logpoller.EvmWord(4), 1) require.NoError(t, err) assert.Equal(t, 1, len(lgs)) } @@ -600,48 +600,48 @@ func TestORM_DataWords(t *testing.T) { }, })) // Outside range should fail. - lgs, err := o1.SelectDataWordRange(addr, eventSig, 0, logpoller.EvmWord(2), logpoller.EvmWord(2), 0) + lgs, err := o1.SelectLogsDataWordRange(addr, eventSig, 0, logpoller.EvmWord(2), logpoller.EvmWord(2), 0) require.NoError(t, err) assert.Equal(t, 0, len(lgs)) // Range including log should succeed - lgs, err = o1.SelectDataWordRange(addr, eventSig, 0, logpoller.EvmWord(1), logpoller.EvmWord(2), 0) + lgs, err = o1.SelectLogsDataWordRange(addr, eventSig, 0, logpoller.EvmWord(1), logpoller.EvmWord(2), 0) require.NoError(t, err) assert.Equal(t, 1, len(lgs)) // Range only covering log should succeed - lgs, err = o1.SelectDataWordRange(addr, eventSig, 0, logpoller.EvmWord(1), logpoller.EvmWord(1), 0) + lgs, err = o1.SelectLogsDataWordRange(addr, eventSig, 0, logpoller.EvmWord(1), logpoller.EvmWord(1), 0) require.NoError(t, err) assert.Equal(t, 1, len(lgs)) // Cannot query for unconfirmed second log. - lgs, err = o1.SelectDataWordRange(addr, eventSig, 1, logpoller.EvmWord(3), logpoller.EvmWord(3), 0) + lgs, err = o1.SelectLogsDataWordRange(addr, eventSig, 1, logpoller.EvmWord(3), logpoller.EvmWord(3), 0) require.NoError(t, err) assert.Equal(t, 0, len(lgs)) // Confirm it, then can query. require.NoError(t, o1.InsertBlock(common.HexToHash("0x2"), 2, time.Now())) - lgs, err = o1.SelectDataWordRange(addr, eventSig, 1, logpoller.EvmWord(3), logpoller.EvmWord(3), 0) + lgs, err = o1.SelectLogsDataWordRange(addr, eventSig, 1, logpoller.EvmWord(3), logpoller.EvmWord(3), 0) require.NoError(t, err) assert.Equal(t, 1, len(lgs)) assert.Equal(t, lgs[0].Data, append(logpoller.EvmWord(2).Bytes(), logpoller.EvmWord(3).Bytes()...)) // Check greater than 1 yields both logs. - lgs, err = o1.SelectDataWordGreaterThan(addr, eventSig, 0, logpoller.EvmWord(1), 0) + lgs, err = o1.SelectLogsDataWordGreaterThan(addr, eventSig, 0, logpoller.EvmWord(1), 0) require.NoError(t, err) assert.Equal(t, 2, len(lgs)) // Unknown hash should an error - lgs, err = o1.SelectUntilBlockHashDataWordGreaterThan(addr, eventSig, 0, logpoller.EvmWord(1), common.HexToHash("0x3")) + lgs, err = o1.SelectLogsUntilBlockHashDataWordGreaterThan(addr, eventSig, 0, logpoller.EvmWord(1), common.HexToHash("0x3")) require.Error(t, err) assert.Equal(t, 0, len(lgs)) // 1 block should include first log - lgs, err = o1.SelectUntilBlockHashDataWordGreaterThan(addr, eventSig, 0, logpoller.EvmWord(1), common.HexToHash("0x1")) + lgs, err = o1.SelectLogsUntilBlockHashDataWordGreaterThan(addr, eventSig, 0, logpoller.EvmWord(1), common.HexToHash("0x1")) require.NoError(t, err) assert.Equal(t, 1, len(lgs)) // 2 block should include both - lgs, err = o1.SelectUntilBlockHashDataWordGreaterThan(addr, eventSig, 0, logpoller.EvmWord(1), common.HexToHash("0x2")) + lgs, err = o1.SelectLogsUntilBlockHashDataWordGreaterThan(addr, eventSig, 0, logpoller.EvmWord(1), common.HexToHash("0x2")) require.NoError(t, err) assert.Equal(t, 2, len(lgs)) } @@ -651,7 +651,7 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { o1 := th.ORM // Insert logs on different topics, should be able to read them - // back using SelectLogsWithSigsByBlockRangeFilter and specifying + // back using SelectLogsWithSigs and specifying // said topics. topic := common.HexToHash("0x1599") topic2 := common.HexToHash("0x1600") @@ -727,7 +727,7 @@ func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) { require.NoError(t, o1.InsertLogs(inputLogs)) startBlock, endBlock := int64(10), int64(15) - logs, err := o1.SelectLogsWithSigsByBlockRangeFilter(startBlock, endBlock, sourceAddr, []common.Hash{ + logs, err := o1.SelectLogsWithSigs(startBlock, endBlock, sourceAddr, []common.Hash{ topic, topic2, }) @@ -792,7 +792,7 @@ func TestLogPoller_Logs(t *testing.T) { assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000005", lgs[5].BlockHash.String()) // Filter by Address and topic - lgs, err = th.ORM.SelectLogsByBlockRangeFilter(1, 3, address1, event1) + lgs, err = th.ORM.SelectLogs(1, 3, address1, event1) require.NoError(t, err) require.Equal(t, 2, len(lgs)) assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000003", lgs[0].BlockHash.String()) @@ -802,7 +802,7 @@ func TestLogPoller_Logs(t *testing.T) { assert.Equal(t, address1, lgs[1].Address) // Filter by block - lgs, err = th.ORM.SelectLogsByBlockRangeFilter(2, 2, address2, event1) + lgs, err = th.ORM.SelectLogs(2, 2, address2, event1) require.NoError(t, err) require.Equal(t, 1, len(lgs)) assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000004", lgs[0].BlockHash.String()) @@ -832,7 +832,7 @@ func BenchmarkLogs(b *testing.B) { require.NoError(b, o.InsertLogs(lgs)) b.ResetTimer() for n := 0; n < b.N; n++ { - _, err := o.SelectDataWordRange(addr, EmitterABI.Events["Log1"].ID, 0, logpoller.EvmWord(8000), logpoller.EvmWord(8002), 0) + _, err := o.SelectLogsDataWordRange(addr, EmitterABI.Events["Log1"].ID, 0, logpoller.EvmWord(8000), logpoller.EvmWord(8002), 0) require.NoError(b, err) } } @@ -1165,7 +1165,7 @@ func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - blockNumber, err := th.ORM.SelectLatestBlockNumberEventSigsAddrsWithConfs(tt.fromBlock, tt.events, tt.addrs, tt.confs) + blockNumber, err := th.ORM.SelectLatestBlockByEventSigsAddrsWithConfs(tt.fromBlock, tt.events, tt.addrs, tt.confs) require.NoError(t, err) assert.Equal(t, tt.expectedBlockNumber, blockNumber) })