diff --git a/cmd/relayer_exporter/relayer_exporter.go b/cmd/relayer_exporter/relayer_exporter.go index cc67f48..2101258 100644 --- a/cmd/relayer_exporter/relayer_exporter.go +++ b/cmd/relayer_exporter/relayer_exporter.go @@ -68,8 +68,9 @@ func main() { } ibcCollector := collector.IBCCollector{ - RPCs: rpcs, - Paths: paths, + RPCs: rpcs, + Paths: paths, + AckProcessors: map[string]collector.AckProcessor{}, } balancesCollector := collector.WalletBalanceCollector{ diff --git a/config.yaml b/config.yaml index f9b2e14..c6a7225 100644 --- a/config.yaml +++ b/config.yaml @@ -2,7 +2,7 @@ github: org: archway-network repo: networks dir: _IBC - testnetsDir: testnets/_IBC + testnetsDir: devnets/_IBC rpc: # mainnets - chainName: archway @@ -22,7 +22,7 @@ rpc: url: https://cosmoshub-rpc.stakely.io:443 - chainName: decentr chainId: mainnet-3 - url: https://poseidon.mainnet.decentr.xyz:443 + url: https://rpc.decentr.chaintools.tech:443 - chainName: jackal chainId: jackal-1 url: https://jackal-rpc.polkachu.com:443 @@ -43,7 +43,7 @@ rpc: url: https://omniflix.kingnodes.com:443 - chainName: osmosis chainId: osmosis-1 - url: https://osmosis-rpc.stakely.io:443 + url: https://osmosis-rpc.onivalidator.com:443 - chainName: quicksilver chainId: quicksilver-2 url: https://rpc.quicksilver.zone:443 @@ -64,28 +64,35 @@ rpc: url: https://rpc.comdex.one:443 - chainName: neutron chainId: neutron-1 - url: https://rpc-kralum.neutron-1.neutron.org:443 + url: https://rpc.novel.remedy.tm.p2p.org:443 + - chainName: qwoyn + chainId: qwoyn-1 + url: https://qwoyn-rpc.staketab.org:443 - chainName: stargaze chainId: stargaze-1 url: https://rpc.stargaze-apis.com:443 + - chainName: andromeda + chainId: andromeda-1 + url: https://andromeda.rpc.kjnodes.com:443 # testnets - - chainName: archwaytestnet - chainId: constantine-3 - url: https://rpc.constantine.archway.tech:443 - - chainName: axelartestnet - chainId: axelar-testnet-lisbon-3 - url: https://axelar-testnet-rpc.qubelabs.io:443 + - chainName: archwaydevnet + chainId: titus-3 + url: https://rpc.titus.archway.io:443 - chainName: osmosistestnet chainId: osmo-test-5 - url: https://rpc.osmotest5.osmosis.zone:443 + url: https://rpc-1.testnet.osmosis.nodes.guru:443 accounts: - # Foundation + # foundation-feegrant-astrovault - address: archway1gpyqzc0aerc85cpk2cm8ec6zkc95x5yqrakskv chainName: archway denom: aarch + # foundation-feegrant-ojo + - address: archway1c2cu99uzjauaj5hg45edhzgsk5saz43y6d3zxp + chainName: archway + denom: aarch # PhiLabs - address: archway1ktka5q3cnsy3ar7qwj2huzz6qj9q4ys7h74l9y chainName: archway - denom: aarch + denom: aarch \ No newline at end of file diff --git a/pkg/collector/ibc_collector.go b/pkg/collector/ibc_collector.go index efabaa4..b6aed8c 100644 --- a/pkg/collector/ibc_collector.go +++ b/pkg/collector/ibc_collector.go @@ -1,8 +1,10 @@ package collector import ( + "context" "fmt" "reflect" + "strconv" "sync" "github.com/prometheus/client_golang/prometheus" @@ -14,8 +16,9 @@ import ( ) const ( - clientExpiryMetricName = "cosmos_ibc_client_expiry" - channelStuckPacketsMetricName = "cosmos_ibc_stuck_packets" + clientExpiryMetricName = "cosmos_ibc_client_expiry" + channelStuckPacketsMetricName = "cosmos_ibc_stuck_packets" + channelNewAckSinceStuckMetricName = "cosmos_ibc_new_ack_since_stuck" ) var ( @@ -33,12 +36,14 @@ var ( ) channelStuckPackets = prometheus.NewDesc( channelStuckPacketsMetricName, - "Returns stuck packets for a channel.", + "Returns number of stuck packets for a channel.", []string{ "src_channel_id", "dst_channel_id", "src_chain_id", "dst_chain_id", + "src_chain_height", + "dst_chain_height", "src_chain_name", "dst_chain_name", "discord_ids", @@ -46,19 +51,42 @@ var ( }, nil, ) + channelNewAckSinceStuck = prometheus.NewDesc( + channelNewAckSinceStuckMetricName, + "Returns block height of new observed IBC Ack since last stuck packet detection, else returns 0.", + []string{ + "src_channel_id", + "dst_channel_id", + "src_chain_id", + "dst_chain_id", + "src_chain_name", + "dst_chain_name", + "status", + }, + nil, + ) ) type IBCCollector struct { - RPCs *map[string]config.RPC - Paths []*config.IBCData + RPCs *map[string]config.RPC + Paths []*config.IBCData + AckProcessors map[string]AckProcessor // map[ChainName]AckProcessor +} + +type AckProcessor struct { + ChainID string + ChannelID string + StartHeight int64 + NewAckHeight chan uint64 } -func (cc IBCCollector) Describe(ch chan<- *prometheus.Desc) { - ch <- clientExpiry - ch <- channelStuckPackets +func (cc IBCCollector) Describe(metricDesc chan<- *prometheus.Desc) { + metricDesc <- clientExpiry + metricDesc <- channelStuckPackets + metricDesc <- channelNewAckSinceStuck } -func (cc IBCCollector) Collect(ch chan<- prometheus.Metric) { +func (cc IBCCollector) Collect(metric chan<- prometheus.Metric) { log.Debug( "Start collecting", zap.String( @@ -77,7 +105,7 @@ func (cc IBCCollector) Collect(ch chan<- prometheus.Metric) { discordIDs := getDiscordIDs(path.Operators) - // Client info + // cosmos_ibc_client_expiry metric collection ci, err := ibc.GetClientsInfo(ctx, path, cc.RPCs) status := successStatus @@ -87,7 +115,7 @@ func (cc IBCCollector) Collect(ch chan<- prometheus.Metric) { log.Error(err.Error()) } - ch <- prometheus.MustNewConstMetric( + metric <- prometheus.MustNewConstMetric( clientExpiry, prometheus.GaugeValue, float64(ci.ChainAClientExpiration.Unix()), @@ -100,7 +128,7 @@ func (cc IBCCollector) Collect(ch chan<- prometheus.Metric) { }..., ) - ch <- prometheus.MustNewConstMetric( + metric <- prometheus.MustNewConstMetric( clientExpiry, prometheus.GaugeValue, float64(ci.ChainBClientExpiration.Unix()), @@ -113,27 +141,29 @@ func (cc IBCCollector) Collect(ch chan<- prometheus.Metric) { }..., ) - // Stuck packets + // cosmos_ibc_stuck_packets metric collection status = successStatus - stuckPackets, err := ibc.GetChannelsInfo(ctx, path, cc.RPCs) + channelsInfo, err := ibc.GetChannelsInfo(ctx, path, cc.RPCs) if err != nil { status = errorStatus log.Error(err.Error()) } - if !reflect.DeepEqual(stuckPackets, ibc.ChannelsInfo{}) { - for _, sp := range stuckPackets.Channels { - ch <- prometheus.MustNewConstMetric( + if !reflect.DeepEqual(channelsInfo, ibc.ChannelsInfo{}) { + for _, chInfo := range channelsInfo.Channels { + metric <- prometheus.MustNewConstMetric( channelStuckPackets, prometheus.GaugeValue, - float64(sp.StuckPackets.Source), + float64(len(chInfo.StuckPackets.Src)), []string{ - sp.Source, - sp.Destination, + chInfo.Source, + chInfo.Destination, (*cc.RPCs)[path.Chain1.ChainName].ChainID, (*cc.RPCs)[path.Chain2.ChainName].ChainID, + strconv.FormatInt(chInfo.StuckPackets.SrcHeight, 10), + strconv.FormatInt(chInfo.StuckPackets.DstHeight, 10), path.Chain1.ChainName, path.Chain2.ChainName, discordIDs, @@ -141,21 +171,30 @@ func (cc IBCCollector) Collect(ch chan<- prometheus.Metric) { }..., ) - ch <- prometheus.MustNewConstMetric( + metric <- prometheus.MustNewConstMetric( channelStuckPackets, prometheus.GaugeValue, - float64(sp.StuckPackets.Destination), + float64(len(chInfo.StuckPackets.Dst)), []string{ - sp.Destination, - sp.Source, + chInfo.Destination, + chInfo.Source, (*cc.RPCs)[path.Chain2.ChainName].ChainID, (*cc.RPCs)[path.Chain1.ChainName].ChainID, + strconv.FormatInt(chInfo.StuckPackets.SrcHeight, 10), + strconv.FormatInt(chInfo.StuckPackets.DstHeight, 10), path.Chain2.ChainName, path.Chain1.ChainName, discordIDs, status, }..., ) + + // cosmos_ibc_new_ack_since_stuck metric collection + err := cc.SetNewAckSinceStuckMetric(ctx, chInfo, path, metric) + + if err != nil { + log.Error(err.Error()) + } } } }(p) @@ -165,3 +204,157 @@ func (cc IBCCollector) Collect(ch chan<- prometheus.Metric) { log.Debug("Stop collecting", zap.String("metric", clientExpiryMetricName)) } + +func (cc IBCCollector) SetNewAckSinceStuckMetric( + ctx context.Context, + chInfo ibc.Channel, + path *config.IBCData, + metric chan<- prometheus.Metric) error { + + // Note: cosmos sdk block height is uint64 but prometheus metric type expects float64 + var newAckHeight uint64 + + // Hash key to search for AckProcessor in cc () + srcHashKey := fmt.Sprintf("%s_%s", path.Chain1.ChainName, chInfo.Source) + dstHashKey := fmt.Sprintf("%s_%s", path.Chain2.ChainName, chInfo.Destination) + + // Publish source chain metric + // Start Src chain AckProcessors if stuck packets exist and no processor is running + if _, ok := cc.AckProcessors[srcHashKey]; len(chInfo.StuckPackets.Src) > 0 && !ok { + log.Info("Creating packet processor", zap.String("ChainName", path.Chain1.ChainName), zap.String("ChannelID", chInfo.Source)) + cc.AckProcessors[srcHashKey] = AckProcessor{ + ChainID: (*cc.RPCs)[path.Chain1.ChainName].ChainID, + ChannelID: chInfo.Source, + StartHeight: chInfo.StuckPackets.SrcHeight, + NewAckHeight: make(chan uint64), + } + // set default for newAckHeight + newAckHeight = 0 + + //TODO:start scan for new Acks from start height + + // read from NewAckHeight from AckProcessor + select { + case newAckHeight = <-cc.AckProcessors[srcHashKey].NewAckHeight: + log.Debug("new ack found", zap.String("ChainName", path.Chain1.ChainName), zap.String("ChannelID", chInfo.Source), + zap.Uint64("newAckHeight", newAckHeight)) + default: + log.Debug("no new acks found", zap.String("ChainName", path.Chain1.ChainName), zap.String("ChannelID", chInfo.Source)) + newAckHeight = 0 + } + + } else if _, ok := cc.AckProcessors[srcHashKey]; ok { + // AckProcessor already running, read from NewAckHeight + // set default for newAckHeight + newAckHeight = 0 + + //TODO:start scan for new Acks from start height + + // read from NewAckHeight from AckProcessor + select { + case newAckHeight = <-cc.AckProcessors[srcHashKey].NewAckHeight: + log.Debug("new ack found", zap.String("ChainName", path.Chain1.ChainName), zap.String("ChannelID", chInfo.Source), + zap.Uint64("newAckHeight", newAckHeight)) + default: + log.Debug("no new acks found", zap.String("ChainName", path.Chain1.ChainName), zap.String("ChannelID", chInfo.Source)) + newAckHeight = 0 + } + } else { + // no stuck packets, no processor running, no new acks + newAckHeight = 0 + } + + // Publish source chain metric + // channelNewAckSinceStuck = prometheus.NewDesc( + // channelNewAckSinceStuckMetricName, + // "Returns 1 if new IBC ack was observed since last stuck packet detection, else returns 0.", + // []string{ + // "src_channel_id", + // "dst_channel_id", + // "src_chain_id", + // "dst_chain_id", + // "src_chain_height", + // "src_chain_name", + // "dst_chain_name", + // "status", + // }, + // nil, + // ) + metric <- prometheus.MustNewConstMetric( + channelNewAckSinceStuck, + prometheus.GaugeValue, + float64(newAckHeight), + []string{ + chInfo.Source, + chInfo.Destination, + (*cc.RPCs)[path.Chain1.ChainName].ChainID, + (*cc.RPCs)[path.Chain2.ChainName].ChainID, + path.Chain1.ChainName, + path.Chain2.ChainName, + "success", + }..., + ) + + // Publish destination chain metric + if _, ok := cc.AckProcessors[dstHashKey]; len(chInfo.StuckPackets.Dst) > 0 && !ok { + // Start Dst chain AckProcessors if stuck packets exist and no processor is running + log.Info("Creating packet processor", zap.String("ChainName", path.Chain2.ChainName), zap.String("ChannelID", chInfo.Destination)) + cc.AckProcessors[dstHashKey] = AckProcessor{ + ChainID: (*cc.RPCs)[path.Chain2.ChainName].ChainID, + ChannelID: chInfo.Source, + StartHeight: chInfo.StuckPackets.DstHeight, + NewAckHeight: make(chan uint64), + } + //start scan for new Acks from start height + + // read from NewAckHeight from AckProcessor + log.Info("creating packet processor", zap.String("ChainName", path.Chain2.ChainName), zap.String("ChannelID", chInfo.Destination)) + + select { + case newAckHeight = <-cc.AckProcessors[dstHashKey].NewAckHeight: + log.Debug("new ack found", zap.String("ChainName", path.Chain2.ChainName), zap.String("ChannelID", chInfo.Destination), + zap.Uint64("newAckHeight", newAckHeight)) + default: + log.Debug("no new acks found", zap.String("ChainName", path.Chain2.ChainName), zap.String("ChannelID", chInfo.Destination)) + newAckHeight = 0 + } + + newAckHeight = <-cc.AckProcessors[path.Chain2.ChainName].NewAckHeight + } else if _, ok := cc.AckProcessors[dstHashKey]; ok { + // AckProcessor already running, read from NewAckHeight + // set default for newAckHeight + newAckHeight = 0 + + //TODO:start scan for new Acks from start height + + // read from NewAckHeight from AckProcessor + select { + case newAckHeight = <-cc.AckProcessors[dstHashKey].NewAckHeight: + log.Debug("new ack found", zap.String("ChainName", path.Chain2.ChainName), zap.String("ChannelID", chInfo.Source), + zap.Uint64("newAckHeight", newAckHeight)) + default: + log.Debug("no new acks found", zap.String("ChainName", path.Chain2.ChainName), zap.String("ChannelID", chInfo.Source)) + newAckHeight = 0 + } + } else { + // no stuck packets, no processor running, no new acks + newAckHeight = 0 + } + + metric <- prometheus.MustNewConstMetric( + channelNewAckSinceStuck, + prometheus.GaugeValue, + float64(newAckHeight), + []string{ + chInfo.Source, + chInfo.Destination, + (*cc.RPCs)[path.Chain1.ChainName].ChainID, + (*cc.RPCs)[path.Chain2.ChainName].ChainID, + path.Chain1.ChainName, + path.Chain2.ChainName, + "fail", + }..., + ) + + return nil +} diff --git a/pkg/ibc/ibc.go b/pkg/ibc/ibc.go index d0541c4..551d8c0 100644 --- a/pkg/ibc/ibc.go +++ b/pkg/ibc/ibc.go @@ -3,16 +3,32 @@ package ibc import ( "context" "fmt" + "sync" "time" + "github.com/avast/retry-go/v4" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer" + "github.com/cosmos/relayer/v2/relayer/chains/cosmos" + "go.uber.org/zap" "github.com/archway-network/relayer_exporter/pkg/chain" "github.com/archway-network/relayer_exporter/pkg/config" + log "github.com/archway-network/relayer_exporter/pkg/logger" ) -const stateOpen = 3 +const ( + stateOpen = 3 + rpcTimeout = "10s" + keyringBackend = "test" +) + +var ( + RtyAttNum = uint(5) + RtyAtt = retry.Attempts(RtyAttNum) + RtyDel = retry.Delay(time.Millisecond * 400) + RtyErr = retry.LastErrorOnly(true) +) type ClientsInfo struct { ChainA *relayer.Chain @@ -33,10 +49,14 @@ type Channel struct { SourcePort string DestinationPort string Ordering string - StuckPackets struct { - Source int - Destination int - } + StuckPackets UnRelaySequences +} + +type UnRelaySequences struct { + Src []uint64 `json:"src"` + Dst []uint64 `json:"dst"` + SrcHeight int64 `json:"src_height"` + DstHeight int64 `json:"dst_height"` } func GetClientsInfo(ctx context.Context, ibc *config.IBCData, rpcs *map[string]config.RPC) (ClientsInfo, error) { @@ -129,13 +149,6 @@ func GetChannelsInfo(ctx context.Context, ibc *config.IBCData, rpcs *map[string] return ChannelsInfo{}, fmt.Errorf("error: %w for %+v", err, cdB) } - // test that RPC endpoints are working - if _, _, err := relayer.QueryLatestHeights( - ctx, chainA, chainB, - ); err != nil { - return channelInfo, fmt.Errorf("error: %w for %v", err, cdA) - } - for i, c := range channelInfo.Channels { var order chantypes.Order @@ -159,11 +172,302 @@ func GetChannelsInfo(ctx context.Context, ibc *config.IBCData, rpcs *map[string] ChannelId: c.Source, } - unrelayedSequences := relayer.UnrelayedSequences(ctx, chainA, chainB, &ch) + unrelayedSequences, err := UnrelayedSequences(ctx, chainA, chainB, &ch) - channelInfo.Channels[i].StuckPackets.Source += len(unrelayedSequences.Src) - channelInfo.Channels[i].StuckPackets.Destination += len(unrelayedSequences.Dst) + if err != nil { + return ChannelsInfo{}, err + } + + channelInfo.Channels[i].StuckPackets = unrelayedSequences } return channelInfo, nil } + +// UnrelayedSequences returns the unrelayed sequence numbers between two chains at latest block height +func UnrelayedSequences(ctx context.Context, src, dst *relayer.Chain, srcChannel *chantypes.IdentifiedChannel) (UnRelaySequences, error) { + var ( + srcPacketSeq = []uint64{} + dstPacketSeq = []uint64{} + urs = UnRelaySequences{Src: []uint64{}, Dst: []uint64{}, SrcHeight: 0, DstHeight: 0} + ) + + srch, dsth, err := relayer.QueryLatestHeights(ctx, src, dst) + if err != nil { + return urs, err + } + + urs.SrcHeight = srch + urs.DstHeight = dsth + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + var ( + res *chantypes.QueryPacketCommitmentsResponse + err error + ) + if err = retry.Do(func() error { + // Query the packet commitment + res, err = src.ChainProvider.QueryPacketCommitments(ctx, uint64(srch), srcChannel.ChannelId, srcChannel.PortId) + switch { + case err != nil: + return err + case res == nil: + return fmt.Errorf("no error on QueryPacketCommitments for %s, however response is nil", src.ChainID()) + default: + return nil + } + }, retry.Context(ctx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) { + log.Info( + "Failed to query packet commitments", + zap.String("channel_id", srcChannel.ChannelId), + zap.String("port_id", srcChannel.PortId), + zap.Uint("attempt", n+1), + zap.Uint("max_attempts", RtyAttNum), + zap.Error(err), + ) + })); err != nil { + log.Error( + "Failed to query packet commitments after max retries", + zap.String("channel_id", srcChannel.ChannelId), + zap.String("port_id", srcChannel.PortId), + zap.Uint("attempts", RtyAttNum), + zap.Error(err), + ) + return + } + + for _, pc := range res.Commitments { + srcPacketSeq = append(srcPacketSeq, pc.Sequence) + } + }() + + go func() { + defer wg.Done() + var ( + res *chantypes.QueryPacketCommitmentsResponse + err error + ) + if err = retry.Do(func() error { + res, err = dst.ChainProvider.QueryPacketCommitments(ctx, uint64(dsth), srcChannel.Counterparty.ChannelId, srcChannel.Counterparty.PortId) + switch { + case err != nil: + return err + case res == nil: + return fmt.Errorf("no error on QueryPacketCommitments for %s, however response is nil", dst.ChainID()) + default: + return nil + } + }, retry.Context(ctx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) { + log.Info( + "Failed to query packet commitments", + zap.String("channel_id", srcChannel.Counterparty.ChannelId), + zap.String("port_id", srcChannel.Counterparty.PortId), + zap.Uint("attempt", n+1), + zap.Uint("max_attempts", RtyAttNum), + zap.Error(err), + ) + })); err != nil { + log.Error( + "Failed to query packet commitments after max retries", + zap.String("channel_id", srcChannel.Counterparty.ChannelId), + zap.String("port_id", srcChannel.Counterparty.PortId), + zap.Uint("attempts", RtyAttNum), + zap.Error(err), + ) + return + } + + for _, pc := range res.Commitments { + dstPacketSeq = append(dstPacketSeq, pc.Sequence) + } + }() + + wg.Wait() + + var ( + srcUnreceivedPackets, dstUnreceivedPackets []uint64 + ) + + if len(srcPacketSeq) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + // Query all packets sent by src that have not been received by dst. + if err := retry.Do(func() error { + var err error + srcUnreceivedPackets, err = dst.ChainProvider.QueryUnreceivedPackets(ctx, uint64(dsth), srcChannel.Counterparty.ChannelId, srcChannel.Counterparty.PortId, srcPacketSeq) + return err + }, retry.Context(ctx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) { + log.Info( + "Failed to query unreceived packets", + zap.String("channel_id", srcChannel.Counterparty.ChannelId), + zap.String("port_id", srcChannel.Counterparty.PortId), + zap.Uint("attempt", n+1), + zap.Uint("max_attempts", RtyAttNum), + zap.Error(err), + ) + })); err != nil { + log.Error( + "Failed to query unreceived packets after max retries", + zap.String("channel_id", srcChannel.Counterparty.ChannelId), + zap.String("port_id", srcChannel.Counterparty.PortId), + zap.Uint("attempts", RtyAttNum), + zap.Error(err), + ) + } + }() + } + + if len(dstPacketSeq) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + // Query all packets sent by dst that have not been received by src. + if err := retry.Do(func() error { + var err error + dstUnreceivedPackets, err = src.ChainProvider.QueryUnreceivedPackets(ctx, uint64(srch), srcChannel.ChannelId, srcChannel.PortId, dstPacketSeq) + return err + }, retry.Context(ctx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) { + log.Info( + "Failed to query unreceived packets", + zap.String("channel_id", srcChannel.ChannelId), + zap.String("port_id", srcChannel.PortId), + zap.Uint("attempt", n+1), + zap.Uint("max_attempts", RtyAttNum), + zap.Error(err), + ) + })); err != nil { + log.Error( + "Failed to query unreceived packets after max retries", + zap.String("channel_id", srcChannel.ChannelId), + zap.String("port_id", srcChannel.PortId), + zap.Uint("attempts", RtyAttNum), + zap.Error(err), + ) + return + } + }() + } + wg.Wait() + + // If this is an UNORDERED channel we can return at this point. + if srcChannel.Ordering != chantypes.ORDERED { + urs.Src = srcUnreceivedPackets + urs.Dst = dstUnreceivedPackets + return urs, nil + } + + // For ordered channels we want to only relay the packet whose sequence number is equal to + // the expected next packet receive sequence from the counterparty. + if len(srcUnreceivedPackets) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + nextSeqResp, err := dst.ChainProvider.QueryNextSeqRecv(ctx, dsth, srcChannel.Counterparty.ChannelId, srcChannel.Counterparty.PortId) + if err != nil { + log.Error( + "Failed to query next packet receive sequence", + zap.String("channel_id", srcChannel.Counterparty.ChannelId), + zap.String("port_id", srcChannel.Counterparty.PortId), + zap.Error(err), + ) + return + } + + for _, seq := range srcUnreceivedPackets { + if seq == nextSeqResp.NextSequenceReceive { + urs.Src = append(urs.Src, seq) + break + } + } + }() + } + + if len(dstUnreceivedPackets) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + nextSeqResp, err := src.ChainProvider.QueryNextSeqRecv(ctx, srch, srcChannel.ChannelId, srcChannel.PortId) + if err != nil { + log.Error( + "Failed to query next packet receive sequence", + zap.String("channel_id", srcChannel.ChannelId), + zap.String("port_id", srcChannel.PortId), + zap.Error(err), + ) + return + } + + for _, seq := range dstUnreceivedPackets { + if seq == nextSeqResp.NextSequenceReceive { + urs.Dst = append(urs.Dst, seq) + break + } + } + }() + } + wg.Wait() + + return urs, nil +} + +func ScanForIBCAcksEvents(ctx context.Context, startHeight int64, rpc config.RPC, newAckHeight chan<- int64) { + // track lastQueriedBlockHeight in memory + var lastQueriedBlockHeight int64 = startHeight + + // create chain provider + providerConfig := cosmos.CosmosProviderConfig{ + ChainID: rpc.ChainID, + Timeout: rpc.Timeout, + KeyringBackend: keyringBackend, + RPCAddr: rpc.URL, + } + + // provider will provide grpc client + provider, err := providerConfig.NewProvider(nil, "", false, rpc.ChainID) + if err != nil { + log.Error(err.Error()) + } + + // RPC client to query block results + rpcTimeout, err := time.ParseDuration(rpc.Timeout) + if err != nil { + log.Error(err.Error()) + } + + rpcClient, err := cosmos.NewRPCClient(rpc.URL, rpcTimeout) + if err != nil { + log.Error(err.Error()) + } + + latestHeight, err := provider.QueryLatestHeight(ctx) + + if err != nil { + log.Error(err.Error()) + } + + // scan for IBC acks events + for { + select { + case <-ctx.Done(): + log.Debug("ScanForIBCAcksEvents done") + return + default: + if latestHeight > lastQueriedBlockHeight { + blockResutlResp, err := rpcClient.BlockResults(ctx, &latestHeight) + if err != nil { + log.Error(err.Error()) + } + lastQueriedBlockHeight = lastQueriedBlockHeight + 1 + for _, txResult := range blockResutlResp.TxsResults { + for _, event := range txResult.Events { + log.Debug("Event", zap.Int64("Height", blockResutlResp.Height), zap.String("Type", event.Type)) + } + } + } + } + } +}