diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index cc07ebb05..dda856235 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -122,7 +122,8 @@ const ( const ( EventAttributeSequenceNumber = "SequenceNumber" EventAttributeSourceChain = "SourceChain" - EventAttributeDestChain = "destChain" + EventAttributeDestChain = "DestChain" + EventAttributeState = "State" ) // Dedicated filters diff --git a/pkg/reader/ccip.go b/pkg/reader/ccip.go index 3d3519bb1..d38380c58 100644 --- a/pkg/reader/ccip.go +++ b/pkg/reader/ccip.go @@ -127,6 +127,7 @@ func (r *ccipChainReader) CommitReportsGTETimestamp( query.KeyFilter{ Key: consts.EventNameCommitReportAccepted, Expressions: []query.Expression{ + query.Timestamp(uint64(ts.Unix()), primitives.Gte), query.Confidence(primitives.Finalized), }, }, @@ -150,15 +151,6 @@ func (r *ccipChainReader) CommitReportsGTETimestamp( return nil, fmt.Errorf("unexpected type %T while expecting a commit report", item) } - valid := item.Timestamp >= uint64(ts.Unix()) - if !valid { - r.lggr.Debugw("commit report too old, skipping", "report", ev, "item", item, - "destChain", dest, - "ts", ts, - "limit", limit) - continue - } - r.lggr.Debugw("processing commit report", "report", ev, "item", item) merkleRoots := make([]cciptypes.MerkleRootChain, 0, len(ev.MerkleRoots)) @@ -245,6 +237,21 @@ func (r *ccipChainReader) ExecutedMessageRanges( query.KeyFilter{ Key: consts.EventNameExecutionStateChanged, Expressions: []query.Expression{ + query.Comparator(consts.EventAttributeSourceChain, primitives.ValueComparator{ + Value: source, + Operator: primitives.Eq, + }), + query.Comparator(consts.EventAttributeSequenceNumber, primitives.ValueComparator{ + Value: seqNumRange.Start(), + Operator: primitives.Gte, + }, primitives.ValueComparator{ + Value: seqNumRange.End(), + Operator: primitives.Lte, + }), + query.Comparator(consts.EventAttributeState, primitives.ValueComparator{ + Value: 0, + Operator: primitives.Gt, + }), query.Confidence(primitives.Finalized), }, }, @@ -263,17 +270,6 @@ func (r *ccipChainReader) ExecutedMessageRanges( if !ok { return nil, fmt.Errorf("failed to cast %T to executionStateChangedEvent", item.Data) } - - // todo: filter via the query - valid := stateChange.SourceChainSelector == source && - stateChange.SequenceNumber >= seqNumRange.Start() && - stateChange.SequenceNumber <= seqNumRange.End() && - stateChange.State > 0 - if !valid { - r.lggr.Debugw("skipping invalid state change", "stateChange", stateChange) - continue - } - executed = append(executed, cciptypes.NewSeqNumRange(stateChange.SequenceNumber, stateChange.SequenceNumber)) } @@ -353,6 +349,21 @@ func (r *ccipChainReader) MsgsBetweenSeqNums( query.KeyFilter{ Key: consts.EventNameCCIPMessageSent, Expressions: []query.Expression{ + query.Comparator(consts.EventAttributeSourceChain, primitives.ValueComparator{ + Value: sourceChainSelector, + Operator: primitives.Eq, + }), + query.Comparator(consts.EventAttributeDestChain, primitives.ValueComparator{ + Value: r.destChain, + Operator: primitives.Eq, + }), + query.Comparator(consts.EventAttributeSequenceNumber, primitives.ValueComparator{ + Value: seqNumRange.Start(), + Operator: primitives.Gte, + }, primitives.ValueComparator{ + Value: seqNumRange.End(), + Operator: primitives.Lte, + }), query.Confidence(primitives.Finalized), }, }, @@ -380,17 +391,8 @@ func (r *ccipChainReader) MsgsBetweenSeqNums( return nil, fmt.Errorf("failed to cast %v to Message", item.Data) } - // todo: filter via the query - valid := msg.Message.Header.SourceChainSelector == sourceChainSelector && - msg.Message.Header.DestChainSelector == r.destChain && - msg.Message.Header.SequenceNumber >= seqNumRange.Start() && - msg.Message.Header.SequenceNumber <= seqNumRange.End() - msg.Message.Header.OnRamp = onRampAddress - - if valid { - msgs = append(msgs, msg.Message.ToMessage()) - } + msgs = append(msgs, msg.Message.ToMessage()) } r.lggr.Infow("decoded messages between sequence numbers", "msgs", msgs,