From c4e59b696ce8025c38e5a7af574914465aa82ac4 Mon Sep 17 00:00:00 2001 From: acud <12988138+acud@users.noreply.github.com> Date: Fri, 15 Nov 2024 19:51:17 +0000 Subject: [PATCH] feat: prune ineffectual mempool txs (#6443) ## Motivation Adds support for eviction of ineffectual transactions from the mempool. --- CHANGELOG.md | 2 + api/grpcserver/grpcserver_test.go | 4 ++ api/grpcserver/interface.go | 1 + api/grpcserver/mocks.go | 39 +++++++++++ api/grpcserver/transaction_service.go | 10 ++- genvm/vm.go | 8 +-- go.mod | 2 +- go.sum | 4 +- .../schema/migrations/0026_pruned_txs.sql | 5 ++ sql/statesql/schema/schema.sql | 7 +- sql/transactions/transactions.go | 63 +++++++++++++++++ sql/transactions/transactions_test.go | 70 +++++++++++++++++++ txs/cache.go | 28 ++++++++ txs/cache_test.go | 8 +-- txs/conservative_state.go | 4 ++ 15 files changed, 241 insertions(+), 14 deletions(-) create mode 100644 sql/statesql/schema/migrations/0026_pruned_txs.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 15e9239cdb..18623abce6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ See [RELEASE](./RELEASE.md) for workflow instructions. * [#6422](https://github.com/spacemeshos/go-spacemesh/pull/6422) Further improved performance of the proposal building process to avoid late proposals. +* [#6443](https://github.com/spacemeshos/go-spacemesh/pull/6443) Improve eviction of ineffectual transactions in the database + which will now show up as ineffectual when querying them from the API. * [#6431](https://github.com/spacemeshos/go-spacemesh/pull/6431) Fix db-allow-schema-drift handling diff --git a/api/grpcserver/grpcserver_test.go b/api/grpcserver/grpcserver_test.go index 53a16f2662..4b9e15c0bd 100644 --- a/api/grpcserver/grpcserver_test.go +++ b/api/grpcserver/grpcserver_test.go @@ -315,6 +315,10 @@ func (t *ConStateAPIMock) GetStateRoot() (types.Hash32, error) { return stateRoot, nil } +func (t *ConStateAPIMock) HasEvicted(id types.TransactionID) (bool, error) { + panic("not implemented") +} + func (t *ConStateAPIMock) GetMeshTransaction(id types.TransactionID) (*types.MeshTransaction, error) { tx, ok := t.returnTx[id] if ok { diff --git a/api/grpcserver/interface.go b/api/grpcserver/interface.go index 7b513b9c7e..bfa88836b9 100644 --- a/api/grpcserver/interface.go +++ b/api/grpcserver/interface.go @@ -40,6 +40,7 @@ type conservativeState interface { GetMeshTransactions([]types.TransactionID) ([]*types.MeshTransaction, map[types.TransactionID]struct{}) GetTransactionsByAddress(types.LayerID, types.LayerID, types.Address) ([]*types.MeshTransaction, error) Validation(raw types.RawTx) system.ValidationRequest + HasEvicted(tid types.TransactionID) (bool, error) } // syncer is the API to get sync status. diff --git a/api/grpcserver/mocks.go b/api/grpcserver/mocks.go index 920c5e8bfc..3867be40c6 100644 --- a/api/grpcserver/mocks.go +++ b/api/grpcserver/mocks.go @@ -691,6 +691,45 @@ func (c *MockconservativeStateGetTransactionsByAddressCall) DoAndReturn(f func(t return c } +// HasEvicted mocks base method. +func (m *MockconservativeState) HasEvicted(tid types.TransactionID) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HasEvicted", tid) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HasEvicted indicates an expected call of HasEvicted. +func (mr *MockconservativeStateMockRecorder) HasEvicted(tid any) *MockconservativeStateHasEvictedCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasEvicted", reflect.TypeOf((*MockconservativeState)(nil).HasEvicted), tid) + return &MockconservativeStateHasEvictedCall{Call: call} +} + +// MockconservativeStateHasEvictedCall wrap *gomock.Call +type MockconservativeStateHasEvictedCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockconservativeStateHasEvictedCall) Return(arg0 bool, arg1 error) *MockconservativeStateHasEvictedCall { + c.Call = c.Call.Return(arg0, arg1) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockconservativeStateHasEvictedCall) Do(f func(types.TransactionID) (bool, error)) *MockconservativeStateHasEvictedCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockconservativeStateHasEvictedCall) DoAndReturn(f func(types.TransactionID) (bool, error)) *MockconservativeStateHasEvictedCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // Validation mocks base method. func (m *MockconservativeState) Validation(raw types.RawTx) system.ValidationRequest { m.ctrl.T.Helper() diff --git a/api/grpcserver/transaction_service.go b/api/grpcserver/transaction_service.go index 16084e81f1..c56f96af8b 100644 --- a/api/grpcserver/transaction_service.go +++ b/api/grpcserver/transaction_service.go @@ -143,7 +143,15 @@ func (s *TransactionService) getTransactionAndStatus( case types.APPLIED: state = pb.TransactionState_TRANSACTION_STATE_PROCESSED default: - state = pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED + evicted, err := s.conState.HasEvicted(txID) + if err != nil { + return nil, state + } + if evicted { + state = pb.TransactionState_TRANSACTION_STATE_INEFFECTUAL + } else { + state = pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED + } } return &tx.Transaction, state } diff --git a/genvm/vm.go b/genvm/vm.go index 432a623ac6..5e8671513d 100644 --- a/genvm/vm.go +++ b/genvm/vm.go @@ -231,7 +231,7 @@ func (v *VM) Apply( for _, reward := range rewardsResult { if err := rewards.Add(tx, &reward); err != nil { - return nil, nil, fmt.Errorf("%w: %w", core.ErrInternal, err) + return nil, nil, fmt.Errorf("add reward %w: %w", core.ErrInternal, err) } } @@ -247,17 +247,17 @@ func (v *VM) Apply( return true }) if err != nil { - return nil, nil, fmt.Errorf("%w: %w", core.ErrInternal, err) + return nil, nil, fmt.Errorf("iterate changed %w: %w", core.ErrInternal, err) } writesPerBlock.Observe(float64(total)) var hashSum types.Hash32 hasher.Sum(hashSum[:0]) if err := layers.UpdateStateHash(tx, layer, hashSum); err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("update state hash: %w", err) } if err := tx.Commit(); err != nil { - return nil, nil, fmt.Errorf("%w: %w", core.ErrInternal, err) + return nil, nil, fmt.Errorf("commit %w: %w", core.ErrInternal, err) } ss.IterateChanged(func(account *core.Account) bool { if err := events.ReportAccountUpdate(account.Address); err != nil { diff --git a/go.mod b/go.mod index 64957c60ce..13fff3a813 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/seehuhn/mt19937 v1.0.0 github.com/slok/go-http-metrics v0.13.0 - github.com/spacemeshos/api/release/go v1.55.0 + github.com/spacemeshos/api/release/go v1.56.0 github.com/spacemeshos/economics v0.1.4 github.com/spacemeshos/fixed v0.1.2 github.com/spacemeshos/go-scale v1.2.1 diff --git a/go.sum b/go.sum index 39f51c0530..8f1925e66f 100644 --- a/go.sum +++ b/go.sum @@ -629,8 +629,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= -github.com/spacemeshos/api/release/go v1.55.0 h1:IQ8PmQ1d7CwUiM1r3NH8uZ+JkEyNjSltiAuqEY6dn6o= -github.com/spacemeshos/api/release/go v1.55.0/go.mod h1:qM6GTS2QtUvxPNIJf+2ObH63bGXYrJnapgOd6l6pbpQ= +github.com/spacemeshos/api/release/go v1.56.0 h1:llBVijoO4I3mhHk0OtGJdTT/11I7ajo0CZp3x8h1EjA= +github.com/spacemeshos/api/release/go v1.56.0/go.mod h1:6o17nhNyXpbVeijAQqkZfL8Pe/IkMGAWMLSLZni0DOU= github.com/spacemeshos/economics v0.1.4 h1:twlawrcQhYNqPgyDv08+24EL/OgUKz3d7q+PvJIAND0= github.com/spacemeshos/economics v0.1.4/go.mod h1:6HKWKiKdxjVQcGa2z/wA0LR4M/DzKib856bP16yqNmQ= github.com/spacemeshos/fixed v0.1.2 h1:pENQ8pXFAqin3f15ZLoOVVeSgcmcFJ0IFdFm4+9u4SM= diff --git a/sql/statesql/schema/migrations/0026_pruned_txs.sql b/sql/statesql/schema/migrations/0026_pruned_txs.sql new file mode 100644 index 0000000000..cb8ae8cc0f --- /dev/null +++ b/sql/statesql/schema/migrations/0026_pruned_txs.sql @@ -0,0 +1,5 @@ +CREATE TABLE evicted_mempool ( + id CHAR(32) NOT NULL, + time INT NOT NULL, + PRIMARY KEY (id) +); diff --git a/sql/statesql/schema/schema.sql b/sql/statesql/schema/schema.sql index 913b1f287f..000d87b80a 100755 --- a/sql/statesql/schema/schema.sql +++ b/sql/statesql/schema/schema.sql @@ -1,4 +1,4 @@ -PRAGMA user_version = 25; +PRAGMA user_version = 26; CREATE TABLE accounts ( address CHAR(24), @@ -69,6 +69,11 @@ CREATE TABLE certificates valid bool NOT NULL, PRIMARY KEY (layer, block) ); +CREATE TABLE evicted_mempool ( + id CHAR(32) NOT NULL, + time INT NOT NULL, + PRIMARY KEY (id) +); CREATE TABLE identities ( pubkey CHAR(32) PRIMARY KEY, diff --git a/sql/transactions/transactions.go b/sql/transactions/transactions.go index 215c3369ef..cfab44aadf 100644 --- a/sql/transactions/transactions.go +++ b/sql/transactions/transactions.go @@ -233,6 +233,17 @@ func Has(db sql.Executor, id types.TransactionID) (bool, error) { return rows > 0, nil } +func HasEvicted(db sql.Executor, id types.TransactionID) (bool, error) { + rows, err := db.Exec("select 1 from evicted_mempool where id = ?1", + func(stmt *sql.Statement) { + stmt.BindBytes(1, id.Bytes()) + }, nil) + if err != nil { + return false, fmt.Errorf("has evicted %s: %w", id, err) + } + return rows > 0, nil +} + // GetByAddress finds all transactions for an address. func GetByAddress(db sql.Executor, from, to types.LayerID, address types.Address) ([]*types.MeshTransaction, error) { var txs []*types.MeshTransaction @@ -295,6 +306,58 @@ func GetAcctPendingFromNonce(db sql.Executor, address types.Address, from uint64 }, "get acct pending from nonce") } +// GetAcctPendingToNonce get all pending transactions with nonce before `to` for the given address. +func GetAcctPendingToNonce(db sql.Executor, address types.Address, to uint64) ([]types.TransactionID, error) { + ids := make([]types.TransactionID, 0) + _, err := db.Exec(`select id from transactions + where principal = ?1 and nonce < ?2 and result is null + order by nonce asc, timestamp asc;`, + func(stmt *sql.Statement) { + stmt.BindBytes(1, address.Bytes()) + stmt.BindBytes(2, util.Uint64ToBytesBigEndian(to)) + }, func(stmt *sql.Statement) bool { + id := types.TransactionID{} + stmt.ColumnBytes(0, id[:]) + ids = append(ids, id) + return true + }) + if err != nil { + return nil, fmt.Errorf("get acct pending to nonce %s: %w", address, err) + } + return ids, nil +} + +func SetEvicted(db sql.Executor, id types.TransactionID) error { + if _, err := db.Exec("insert into evicted_mempool (id, time) values (?1, ?2) on conflict do nothing;", + func(stmt *sql.Statement) { + stmt.BindBytes(1, id.Bytes()) + stmt.BindInt64(2, time.Now().UnixNano()) + }, nil); err != nil { + return fmt.Errorf("set evicted %s: %w", id, err) + } + return nil +} + +func Delete(db sql.Executor, id types.TransactionID) error { + if _, err := db.Exec("delete from transactions where id = ?1;", + func(stmt *sql.Statement) { + stmt.BindBytes(1, id.Bytes()) + }, nil); err != nil { + return fmt.Errorf("delete %s: %w", id, err) + } + return nil +} + +func PruneEvicted(db sql.Executor, before time.Time) error { + if _, err := db.Exec("delete from evicted_mempool where time < ?1;", + func(stmt *sql.Statement) { + stmt.BindInt64(1, before.UnixNano()) + }, nil); err != nil { + return fmt.Errorf("prune evicted %w", err) + } + return nil +} + // query MUST ensure that this order of fields tx, header, layer, block, timestamp, id. func queryPending( db sql.Executor, diff --git a/sql/transactions/transactions_test.go b/sql/transactions/transactions_test.go index 0bdac033b3..8818973a5c 100644 --- a/sql/transactions/transactions_test.go +++ b/sql/transactions/transactions_test.go @@ -562,3 +562,73 @@ func TestTransactionInBlock(t *testing.T) { _, _, err = transactions.TransactionInBlock(db, tid, lids[2]) require.ErrorIs(t, err, sql.ErrNotFound) } + +func TestTransactionEvictMempool(t *testing.T) { + principals := []types.Address{ + {1}, + {2}, + {3}, + } + txs := []types.Transaction{ + { + RawTx: types.RawTx{ID: types.TransactionID{1}}, + TxHeader: &types.TxHeader{Principal: principals[0], Nonce: 0}, + }, + { + RawTx: types.RawTx{ID: types.TransactionID{2}}, + TxHeader: &types.TxHeader{Principal: principals[0], Nonce: 1}, + }, + { + RawTx: types.RawTx{ID: types.TransactionID{3}}, + TxHeader: &types.TxHeader{Principal: principals[1], Nonce: 0}, + }, + } + db := statesql.InMemoryTest(t) + for _, tx := range txs { + require.NoError(t, transactions.Add(db, &tx, time.Time{})) + } + err := transactions.SetEvicted(db, types.TransactionID{1}) + require.NoError(t, err) + + err = transactions.Delete(db, types.TransactionID{1}) + require.NoError(t, err) + + pending, err := transactions.GetAcctPendingFromNonce(db, principals[0], 1) + require.NoError(t, err) + require.Len(t, pending, 1) + require.Equal(t, pending[0].ID, txs[1].ID) + + pending, err = transactions.GetAcctPendingFromNonce(db, principals[1], 0) + require.NoError(t, err) + require.Len(t, pending, 1) + require.Equal(t, pending[0].ID, txs[2].ID) + + has, err := transactions.Has(db, txs[0].ID) + require.False(t, has) + require.NoError(t, err) + + has, err = transactions.HasEvicted(db, txs[0].ID) + require.True(t, has) + require.NoError(t, err) +} + +func TestPruneEvicted(t *testing.T) { + txId := types.TransactionID{1} + db := statesql.InMemoryTest(t) + db.Exec(`insert into evicted_mempool (id, time) values (?1,?2);`, + func(stmt *sql.Statement) { + stmt.BindBytes(1, txId.Bytes()) + stmt.BindInt64(2, time.Now().Add(-13*time.Hour).UnixNano()) + }, nil) + + has, err := transactions.HasEvicted(db, txId) + require.True(t, has) + require.NoError(t, err) + + err = transactions.PruneEvicted(db, time.Now().Add(-12*time.Hour)) + require.NoError(t, err) + + has, err = transactions.HasEvicted(db, txId) + require.False(t, has) + require.NoError(t, err) +} diff --git a/txs/cache.go b/txs/cache.go index 0f9d125158..d5aa8516f4 100644 --- a/txs/cache.go +++ b/txs/cache.go @@ -396,9 +396,32 @@ func (ac *accountCache) resetAfterApply( ac.txsByNonce = list.New() ac.startNonce = nextNonce ac.startBalance = newBalance + + err := ac.evictPendingNonce(db) + if err != nil { + return fmt.Errorf("evict pending: %w", err) + } return ac.addPendingFromNonce(logger, db, ac.startNonce, applied) } +func (ac *accountCache) evictPendingNonce(db sql.StateDatabase) error { + return db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error { + txIds, err := transactions.GetAcctPendingToNonce(tx, ac.addr, ac.startNonce) + if err != nil { + return fmt.Errorf("get pending to nonce: %w", err) + } + for _, tid := range txIds { + if err := transactions.SetEvicted(tx, tid); err != nil { + return fmt.Errorf("set evicted for %s: %w", tid, err) + } + if err := transactions.Delete(tx, tid); err != nil { + return fmt.Errorf("delete tx %s: %w", tid, err) + } + } + return nil + }) +} + func (ac *accountCache) shouldEvict() bool { return ac.txsByNonce.Len() == 0 && !ac.moreInDB } @@ -776,6 +799,11 @@ func (c *Cache) ApplyLayer( } acctResetDuration.Observe(float64(time.Since(t2))) } + + err := transactions.PruneEvicted(db, time.Now().Add(-12*time.Hour)) + if err != nil { + logger.Warn("failed to prune evicted", zap.Error(err)) + } return nil } diff --git a/txs/cache_test.go b/txs/cache_test.go index 6d83fdf0e3..e4c66c00eb 100644 --- a/txs/cache_test.go +++ b/txs/cache_test.go @@ -334,10 +334,10 @@ func TestCache_Account_HappyFlow(t *testing.T) { checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance+income) // mempool is unchanged checkMempool(t, tc.Cache, expectedMempool) + + // pruning has removed old and ineffective txs for _, mtx := range append(oldNonces, sameNonces...) { - got, err := transactions.Get(tc.db, mtx.ID) - require.NoError(t, err) - require.Equal(t, types.MEMPOOL, got.State) + checkTXNotInDB(t, tc.db, mtx.ID) } // revert to one layer before lid @@ -357,8 +357,6 @@ func TestCache_Account_HappyFlow(t *testing.T) { } checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance) checkTXStateFromDB(t, tc.db, mtxs, types.MEMPOOL) - checkTXStateFromDB(t, tc.db, oldNonces, types.MEMPOOL) - checkTXStateFromDB(t, tc.db, sameNonces, types.MEMPOOL) } func TestCache_Account_TXInMultipleLayers(t *testing.T) { diff --git a/txs/conservative_state.go b/txs/conservative_state.go index edfa66afa8..7be6f38f07 100644 --- a/txs/conservative_state.go +++ b/txs/conservative_state.go @@ -283,3 +283,7 @@ func ShuffleWithNonceOrder( }))) return result } + +func (cs *ConservativeState) HasEvicted(tid types.TransactionID) (bool, error) { + return transactions.HasEvicted(cs.db, tid) +}