Skip to content

Commit

Permalink
feat(events): add max results error and default raw codec for eth_get…
Browse files Browse the repository at this point in the history
…Logs (#12671)
  • Loading branch information
akaladarshi authored Nov 20, 2024
1 parent 8a5dc80 commit 4f92348
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
## Bug Fixes

- Make `EthTraceFilter` / `trace_filter` skip null rounds instead of erroring. ([filecoin-project/lotus#12702](https://github.com/filecoin-project/lotus/pull/12702))
- Event APIs (`GetActorEventsRaw`, `SubscribeActorEventsRaw`, `eth_getLogs`, `eth_newFilter`, etc.) will now return an error when a request matches more than `MaxFilterResults` (default: 10,000) rather than silently truncating the results. Also apply an internal event matcher for `eth_getLogs` (etc.) to avoid builtin actor events on database query so as not to include them in `MaxFilterResults` calculation. ([filecoin-project/lotus#12671](https://github.com/filecoin-project/lotus/pull/12671))

## New Features

Expand Down
9 changes: 7 additions & 2 deletions chain/index/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)

var ErrMaxResultsReached = fmt.Errorf("filter matches too many events, try a more restricted filter")

const maxLookBackForWait = 120 // one hour of tipsets

type executedMessage struct {
Expand Down Expand Up @@ -358,9 +360,9 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
if row.id != currentID {
// Unfortunately we can't easily incorporate the max results limit into the query due to the
// unpredictable number of rows caused by joins
// Break here to stop collecting rows
// Error here to inform the caller that we've hit the max results limit
if f.MaxResults > 0 && len(ces) >= f.MaxResults {
break
return nil, ErrMaxResultsReached
}

currentID = row.id
Expand Down Expand Up @@ -563,6 +565,9 @@ func makePrefillFilterQuery(f *EventFilter) ([]any, string, error) {
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")
}
}
} else if f.Codec != 0 { // if no keys are specified, we can use the codec filter
clauses = append(clauses, "ee.codec=?")
values = append(values, f.Codec)
}

s := `SELECT
Expand Down
269 changes: 269 additions & 0 deletions chain/index/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -390,6 +391,257 @@ func TestGetEventsFilterByAddress(t *testing.T) {
}
}

func TestGetEventsForFilterWithRawCodec(t *testing.T) {
ctx := context.Background()
seed := time.Now().UnixNano()
t.Logf("seed: %d", seed)
rng := pseudo.New(pseudo.NewSource(seed))
headHeight := abi.ChainEpoch(60)

// Setup the indexer and chain store with the specified head height
si, _, cs := setupWithHeadIndexed(t, headHeight, rng)
t.Cleanup(func() { _ = si.Close() })

// Define codec constants (replace with actual multicodec values)
var (
codecRaw = multicodec.Raw
codecCBOR = multicodec.Cbor
)

// Create events with different codecs
evRaw1 := fakeEventWithCodec(
abi.ActorID(1),
[]kv{
{k: "type", v: []byte("approval")},
{k: "signer", v: []byte("addr1")},
},
codecRaw,
)

evCBOR := fakeEventWithCodec(
abi.ActorID(2),
[]kv{
{k: "type", v: []byte("approval")},
{k: "signer", v: []byte("addr2")},
},
codecCBOR,
)

evRaw2 := fakeEventWithCodec(
abi.ActorID(3),
[]kv{
{k: "type", v: []byte("transfer")},
{k: "recipient", v: []byte("addr3")},
},
codecRaw,
)

// Aggregate events
events := []types.Event{*evRaw1, *evCBOR, *evRaw2}

// Create a fake message and associate it with the events
fm := fakeMessage(address.TestAddress, address.TestAddress)
em1 := executedMessage{
msg: fm,
evs: events,
}

// Mock the Actor to Address mapping
si.SetActorToDelegatedAddresFunc(func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
}
return idAddr, true
})

// Mock the executed messages loader
si.setExecutedMessagesLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) {
return []executedMessage{em1}, nil
})

// Create fake tipsets
fakeTipSet1 := fakeTipSet(t, rng, 1, nil)
fakeTipSet2 := fakeTipSet(t, rng, 2, nil)

// Associate tipsets with their heights and CIDs
cs.SetTipsetByHeightAndKey(1, fakeTipSet1.Key(), fakeTipSet1) // Height 1
cs.SetTipsetByHeightAndKey(2, fakeTipSet2.Key(), fakeTipSet2) // Height 2
cs.SetTipSetByCid(t, fakeTipSet1)
cs.SetTipSetByCid(t, fakeTipSet2)

// Associate messages with tipsets
cs.SetMessagesForTipset(fakeTipSet1, []types.ChainMsg{fm})

// Apply the indexer to process the tipsets
require.NoError(t, si.Apply(ctx, fakeTipSet1, fakeTipSet2))

t.Run("FilterEventsByRawCodecWithoutKeys", func(t *testing.T) {
f := &EventFilter{
MinHeight: 1,
MaxHeight: 2,
Codec: codecRaw, // Set to RAW codec
}

// Retrieve events based on the filter
ces, err := si.GetEventsForFilter(ctx, f)
require.NoError(t, err)

// Define expected collected events (only RAW encoded events)
expectedCES := []*CollectedEvent{
{
Entries: evRaw1.Entries,
EmitterAddr: must.One(address.NewIDAddress(uint64(evRaw1.Emitter))),
EventIdx: 0,
Reverted: false,
Height: 1,
TipSetKey: fakeTipSet1.Key(),
MsgIdx: 0,
MsgCid: fm.Cid(),
},
{
Entries: evRaw2.Entries,
EmitterAddr: must.One(address.NewIDAddress(uint64(evRaw2.Emitter))),
EventIdx: 2, // Adjust based on actual indexing
Reverted: false,
Height: 1,
TipSetKey: fakeTipSet1.Key(),
MsgIdx: 0,
MsgCid: fm.Cid(),
},
}

require.Equal(t, expectedCES, ces)
})
}

func TestMaxFilterResults(t *testing.T) {
ctx := context.Background()
seed := time.Now().UnixNano()
t.Logf("seed: %d", seed)
rng := pseudo.New(pseudo.NewSource(seed))
headHeight := abi.ChainEpoch(60)

// Setup the indexer and chain store with the specified head height
si, _, cs := setupWithHeadIndexed(t, headHeight, rng)
t.Cleanup(func() { _ = si.Close() })

// Define codec constants (replace with actual multicodec values)
var (
codecRaw = multicodec.Raw
codecCBOR = multicodec.Cbor
)

// Create events with different codecs
evRaw1 := fakeEventWithCodec(
abi.ActorID(1),
[]kv{
{k: "type", v: []byte("approval")},
{k: "signer", v: []byte("addr1")},
},
codecRaw,
)

evCBOR := fakeEventWithCodec(
abi.ActorID(2),
[]kv{
{k: "type", v: []byte("approval")},
{k: "signer", v: []byte("addr2")},
},
codecCBOR,
)

evRaw2 := fakeEventWithCodec(
abi.ActorID(3),
[]kv{
{k: "type", v: []byte("transfer")},
{k: "recipient", v: []byte("addr3")},
},
codecRaw,
)

evRaw3 := fakeEventWithCodec(
abi.ActorID(4),
[]kv{
{k: "type", v: []byte("transfer")},
{k: "recipient", v: []byte("addr4")},
},
codecCBOR,
)

// Aggregate events
events := []types.Event{*evRaw1, *evCBOR, *evRaw2, *evRaw3}

// Create a fake message and associate it with the events
fm := fakeMessage(address.TestAddress, address.TestAddress)
em1 := executedMessage{
msg: fm,
evs: events,
}

// Mock the Actor to Address mapping
si.SetActorToDelegatedAddresFunc(func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
}
return idAddr, true
})

// Mock the executed messages loader
si.setExecutedMessagesLoaderFunc(func(ctx context.Context, cs ChainStore, msgTs, rctTs *types.TipSet) ([]executedMessage, error) {
return []executedMessage{em1}, nil
})

// Create fake tipsets
fakeTipSet1 := fakeTipSet(t, rng, 1, nil)
fakeTipSet2 := fakeTipSet(t, rng, 2, nil)

// Associate tipsets with their heights and CIDs
cs.SetTipsetByHeightAndKey(1, fakeTipSet1.Key(), fakeTipSet1) // Height 1
cs.SetTipsetByHeightAndKey(2, fakeTipSet2.Key(), fakeTipSet2) // Height 2
cs.SetTipSetByCid(t, fakeTipSet1)
cs.SetTipSetByCid(t, fakeTipSet2)

// Associate messages with tipsets
cs.SetMessagesForTipset(fakeTipSet1, []types.ChainMsg{fm})

// Apply the indexer to process the tipsets
require.NoError(t, si.Apply(ctx, fakeTipSet1, fakeTipSet2))

// if we hit max results, we should get an error
// we have total 4 events
testCases := []struct {
name string
maxResults int
expectedCount int
expectedErr string
}{
{name: "no max results", maxResults: 0, expectedCount: 4},
{name: "max result more that total events", maxResults: 10, expectedCount: 4},
{name: "max results less than total events", maxResults: 1, expectedErr: ErrMaxResultsReached.Error()},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
f := &EventFilter{
MinHeight: 1,
MaxHeight: 2,
MaxResults: tc.maxResults,
}

ces, err := si.GetEventsForFilter(ctx, f)
if tc.expectedErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.expectedErr)
} else {
require.NoError(t, err)
require.Equal(t, tc.expectedCount, len(ces))
}
})
}
}

func sortAddresses(addrs []address.Address) {
sort.Slice(addrs, func(i, j int) bool {
return addrs[i].String() < addrs[j].String()
Expand Down Expand Up @@ -435,6 +687,23 @@ func fakeEvent(emitter abi.ActorID, indexed []kv, unindexed []kv) *types.Event {
return ev
}

func fakeEventWithCodec(emitter abi.ActorID, indexed []kv, codec multicodec.Code) *types.Event {
ev := &types.Event{
Emitter: emitter,
}

for _, in := range indexed {
ev.Entries = append(ev.Entries, types.EventEntry{
Flags: 0x01,
Key: in.k,
Codec: uint64(codec),
Value: in.v,
})
}

return ev
}

type kv struct {
k string
v []byte
Expand Down
3 changes: 3 additions & 0 deletions chain/index/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
Expand Down Expand Up @@ -47,6 +48,8 @@ type EventFilter struct {

KeysWithCodec map[string][]types.ActorEventBlock // map of key names to a list of alternate values that may match
MaxResults int // maximum number of results to collect, 0 is unlimited

Codec multicodec.Code // optional codec filter, only used if KeysWithCodec is not set
}

type Indexer interface {
Expand Down
1 change: 1 addition & 0 deletions node/impl/full/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1784,6 +1784,7 @@ func (e *EthEventHandler) ethGetEventsForFilter(ctx context.Context, filterSpec
TipsetCid: pf.tipsetCid,
Addresses: pf.addresses,
KeysWithCodec: pf.keys,
Codec: multicodec.Raw,
MaxResults: e.EventFilterManager.MaxFilterResults,
}

Expand Down

0 comments on commit 4f92348

Please sign in to comment.