Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[factory] simplify interface for archive mode #4474

Merged
merged 3 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func (core *coreService) readState(ctx context.Context, p protocol.Protocol, hei
if height != "" {
inputHeight, err := strconv.ParseUint(height, 0, 64)
if err != nil {
return nil, uint64(0), err
return nil, 0, err
}
rp := rolldpos.FindProtocol(core.registry)
if rp != nil {
Expand All @@ -961,7 +961,11 @@ func (core *coreService) readState(ctx context.Context, p protocol.Protocol, hei
}
if inputHeight < tipHeight {
// old data, wrap to history state reader
d, h, err := p.ReadState(ctx, factory.NewHistoryStateReader(core.sf, inputHeight), methodName, arguments...)
historySR, err := core.sf.WorkingSetAtHeight(ctx, inputHeight)
if err != nil {
return nil, 0, err
}
d, h, err := p.ReadState(ctx, historySR, methodName, arguments...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can combine codes after getting the specified StateReader

if err == nil {
key.Height = strconv.FormatUint(h, 10)
core.readCache.Put(key.Hash(), d)
Expand Down
1 change: 1 addition & 0 deletions api/serverV2_integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func addActsToActPool(ctx context.Context, ap actpool.ActPool) error {

func setupChain(cfg testConfig) (blockchain.Blockchain, blockdao.BlockDAO, blockindex.Indexer, blockindex.BloomFilterIndexer, factory.Factory, actpool.ActPool, *protocol.Registry, string, error) {
cfg.chain.ProducerPrivKey = hex.EncodeToString(identityset.PrivateKey(0).Bytes())
cfg.chain.EnableArchiveMode = true
registry := protocol.NewRegistry()
factoryCfg := factory.GenerateConfig(cfg.chain, cfg.genesis)
sf, err := factory.NewFactory(factoryCfg, db.NewMemKVStore(), factory.RegistryOption(registry))
Expand Down
17 changes: 9 additions & 8 deletions blockchain/integrity/integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2355,14 +2355,14 @@ func testHistoryForAccount(t *testing.T, statetx bool) {

// check history account's balance
if statetx {
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), a)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), b)
_, err = sf.WorkingSetAtHeight(ctx, 0)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
} else {
AccountA, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), a)
sr, err := sf.WorkingSetAtHeight(ctx, bc.TipHeight()-1)
require.NoError(err)
AccountA, err = accountutil.AccountState(ctx, sr, a)
require.NoError(err)
AccountB, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), b)
AccountB, err = accountutil.AccountState(ctx, sr, b)
require.NoError(err)
require.Equal(big.NewInt(100), AccountA.Balance)
require.Equal(big.NewInt(100), AccountB.Balance)
Expand Down Expand Up @@ -2403,10 +2403,11 @@ func testHistoryForContract(t *testing.T, statetx bool) {

// check the the original balance again
if statetx {
_, err = accountutil.AccountState(ctx, factory.NewHistoryStateReader(sf, bc.TipHeight()-1), contractAddr)
require.True(errors.Cause(err) == factory.ErrNotSupported)
_, err = sf.WorkingSetAtHeight(ctx, bc.TipHeight()-1)
require.Equal(factory.ErrNotSupported, errors.Cause(err))
} else {
sr := factory.NewHistoryStateReader(sf, bc.TipHeight()-1)
sr, err := sf.WorkingSetAtHeight(ctx, bc.TipHeight()-1)
require.NoError(err)
account, err = accountutil.AccountState(ctx, sr, contractAddr)
require.NoError(err)
balance = BalanceOfContract(contract, genesisAccount, sr, t, account.Root)
Expand Down
104 changes: 32 additions & 72 deletions state/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ type (
NewBlockBuilder(context.Context, actpool.ActPool, func(action.Envelope) (*action.SealedEnvelope, error)) (*block.Builder, error)
PutBlock(context.Context, *block.Block) error
DeleteTipBlock(context.Context, *block.Block) error
StateAtHeight(uint64, interface{}, ...protocol.StateOption) error
StatesAtHeight(uint64, ...protocol.StateOption) (state.Iterator, error)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these 2 are included in WorkingSetAtHeight funcs

WorkingSet(context.Context) (protocol.StateManager, error)
WorkingSetAtHeight(context.Context, uint64) (protocol.StateManager, error)
}
Expand Down Expand Up @@ -272,6 +270,31 @@ func (sf *factory) newWorkingSet(ctx context.Context, height uint64) (*workingSe
if err != nil {
return nil, err
}
return sf.createSfWorkingSet(ctx, height, store)
}

func (sf *factory) newWorkingSetAtHeight(ctx context.Context, height uint64) (*workingSet, error) {
span := tracer.SpanFromContext(ctx)
span.AddEvent("factory.newWorkingSet")
defer span.End()

g := genesis.MustExtractGenesisContext(ctx)
flusher, err := db.NewKVStoreFlusher(
sf.dao,
batch.NewCachedBatch(),
sf.flusherOptions(!g.IsEaster(height))...,
)
if err != nil {
return nil, err
}
store, err := newFactoryWorkingSetStoreAtHeight(sf.protocolView, flusher, height)
if err != nil {
return nil, err
}
return sf.createSfWorkingSet(ctx, height, store)
}

func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store workingSetStore) (*workingSet, error) {
if err := store.Start(ctx); err != nil {
return nil, err
}
Expand All @@ -286,7 +309,6 @@ func (sf *factory) newWorkingSet(ctx context.Context, height uint64) (*workingSe
}
}
}

return newWorkingSet(height, store), nil
}

Expand Down Expand Up @@ -388,16 +410,20 @@ func (sf *factory) WorkingSet(ctx context.Context) (protocol.StateManager, error
}

func (sf *factory) WorkingSetAtHeight(ctx context.Context, height uint64) (protocol.StateManager, error) {
if !sf.saveHistory {
return nil, ErrNoArchiveData
}
sf.mutex.Lock()
defer sf.mutex.Unlock()
return sf.newWorkingSet(ctx, height+1)
if height > sf.currentChainHeight {
return nil, errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
return sf.newWorkingSetAtHeight(ctx, height)
}

// PutBlock persists all changes in RunActions() into the DB
func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error {
sf.mutex.Lock()
timer := sf.timerFactory.NewTimer("Commit")
sf.mutex.Unlock()
Copy link
Member Author

@dustinxie dustinxie Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when reviewing the usage of sf.mutex, found it's not really needed here

defer timer.End()
producer := blk.PublicKey().Address()
if producer == nil {
Expand Down Expand Up @@ -456,52 +482,6 @@ func (sf *factory) DeleteTipBlock(_ context.Context, _ *block.Block) error {
return errors.Wrap(ErrNotSupported, "cannot delete tip block from factory")
}

// StateAtHeight returns a confirmed state at height -- archive mode
func (sf *factory) StateAtHeight(height uint64, s interface{}, opts ...protocol.StateOption) error {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
cfg, err := processOptions(opts...)
if err != nil {
return err
}
if cfg.Keys != nil {
return errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
}
if height > sf.currentChainHeight {
return errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
return sf.stateAtHeight(height, cfg.Namespace, cfg.Key, s)
}

// StatesAtHeight returns a set states in the state factory at height -- archive mode
func (sf *factory) StatesAtHeight(height uint64, opts ...protocol.StateOption) (state.Iterator, error) {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
if height > sf.currentChainHeight {
return nil, errors.Errorf("query height %d is higher than tip height %d", height, sf.currentChainHeight)
}
cfg, err := processOptions(opts...)
if err != nil {
return nil, err
}
if cfg.Keys != nil {
return nil, errors.Wrap(ErrNotSupported, "Read states with keys option has not been implemented yet")
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false)
if err != nil {
return nil, errors.Wrapf(err, "failed to generate trie for %d", height)
}
if err := tlt.Start(context.Background()); err != nil {
return nil, err
}
defer tlt.Stop(context.Background())
keys, values, err := readStatesFromTLT(tlt, cfg.Namespace, cfg.Keys)
if err != nil {
return nil, err
}
return state.NewIterator(keys, values)
}

Copy link
Member Author

@dustinxie dustinxie Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

included in WorkingSetAtHeight funcs

// State returns a confirmed state in the state factory
func (sf *factory) State(s interface{}, opts ...protocol.StateOption) (uint64, error) {
sf.mutex.RLock()
Expand Down Expand Up @@ -573,26 +553,6 @@ func legacyKeyLen() int {
return 20
}

func (sf *factory) stateAtHeight(height uint64, ns string, key []byte, s interface{}) error {
if !sf.saveHistory {
return ErrNoArchiveData
}
tlt, err := newTwoLayerTrie(ArchiveTrieNamespace, sf.dao, fmt.Sprintf("%s-%d", ArchiveTrieRootKey, height), false)
if err != nil {
return errors.Wrapf(err, "failed to generate trie for %d", height)
}
if err := tlt.Start(context.Background()); err != nil {
return err
}
defer tlt.Stop(context.Background())

value, err := readStateFromTLT(tlt, ns, key)
if err != nil {
return err
}
return state.Deserialize(s, value)
}

func (sf *factory) createGenesisStates(ctx context.Context) error {
ws, err := sf.newWorkingSet(ctx, 0)
if err != nil {
Expand Down
37 changes: 23 additions & 14 deletions state/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ func TestState(t *testing.T) {

func TestHistoryState(t *testing.T) {
r := require.New(t)
var err error
// using factory and enable history
cfg := DefaultConfig
cfg.Chain.TrieDBPath, err = testutil.PathOfTempFile(_triePath)
file1, err := testutil.PathOfTempFile(_triePath)
r.NoError(err)
cfg.Chain.TrieDBPath = file1
cfg.Chain.EnableArchiveMode = true
db1, err := db.CreateKVStore(db.DefaultConfig, cfg.Chain.TrieDBPath)
r.NoError(err)
Expand All @@ -372,17 +372,19 @@ func TestHistoryState(t *testing.T) {
testHistoryState(sf, t, false, cfg.Chain.EnableArchiveMode)

// using stateDB and enable history
cfg.Chain.TrieDBPath, err = testutil.PathOfTempFile(_triePath)
file2, err := testutil.PathOfTempFile(_triePath)
r.NoError(err)
cfg.Chain.TrieDBPath = file2
db2, err := db.CreateKVStoreWithCache(db.DefaultConfig, cfg.Chain.TrieDBPath, cfg.Chain.StateDBCacheSize)
r.NoError(err)
sf, err = NewStateDB(cfg, db2, SkipBlockValidationStateDBOption())
r.NoError(err)
testHistoryState(sf, t, true, cfg.Chain.EnableArchiveMode)

// using factory and disable history
cfg.Chain.TrieDBPath, err = testutil.PathOfTempFile(_triePath)
file3, err := testutil.PathOfTempFile(_triePath)
r.NoError(err)
cfg.Chain.TrieDBPath = file3
cfg.Chain.EnableArchiveMode = false
db1, err = db.CreateKVStore(db.DefaultConfig, cfg.Chain.TrieDBPath)
r.NoError(err)
Expand All @@ -391,15 +393,19 @@ func TestHistoryState(t *testing.T) {
testHistoryState(sf, t, false, cfg.Chain.EnableArchiveMode)

// using stateDB and disable history
cfg.Chain.TrieDBPath, err = testutil.PathOfTempFile(_triePath)
file4, err := testutil.PathOfTempFile(_triePath)
r.NoError(err)
cfg.Chain.TrieDBPath = file4
db2, err = db.CreateKVStoreWithCache(db.DefaultConfig, cfg.Chain.TrieDBPath, cfg.Chain.StateDBCacheSize)
r.NoError(err)
sf, err = NewStateDB(cfg, db2, SkipBlockValidationStateDBOption())
r.NoError(err)
testHistoryState(sf, t, true, cfg.Chain.EnableArchiveMode)
defer func() {
testutil.CleanupPath(cfg.Chain.TrieDBPath)
testutil.CleanupPath(file1)
testutil.CleanupPath(file2)
testutil.CleanupPath(file3)
testutil.CleanupPath(file4)
}()
}

Expand Down Expand Up @@ -567,21 +573,24 @@ func testHistoryState(sf Factory, t *testing.T, statetx, archive bool) {

// check archive data
if statetx {
// statetx not support archive mode
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.Equal(t, ErrNotSupported, errors.Cause(err))
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
// statetx not support archive mode yet
_, err = sf.WorkingSetAtHeight(ctx, 0)
require.Equal(t, ErrNotSupported, errors.Cause(err))
} else {
_, err = sf.WorkingSetAtHeight(ctx, 10)
if !archive {
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.Equal(t, ErrNoArchiveData, errors.Cause(err))
_, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
} else {
require.Contains(t, err.Error(), "query height 10 is higher than tip height 1")
}
sr, err := sf.WorkingSetAtHeight(ctx, 0)
if !archive {
require.Equal(t, ErrNoArchiveData, errors.Cause(err))
} else {
accountA, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), a)
require.NoError(t, err)
accountB, err = accountutil.AccountState(ctx, NewHistoryStateReader(sf, 0), b)
accountA, err = accountutil.AccountState(ctx, sr, a)
require.NoError(t, err)
accountB, err = accountutil.AccountState(ctx, sr, b)
require.NoError(t, err)
require.Equal(t, big.NewInt(100), accountA.Balance)
require.Equal(t, big.NewInt(0), accountB.Balance)
Expand Down
49 changes: 0 additions & 49 deletions state/factory/historyfactory.go

This file was deleted.

3 changes: 2 additions & 1 deletion state/factory/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ func (sdb *stateDB) WorkingSet(ctx context.Context) (protocol.StateManager, erro
}

func (sdb *stateDB) WorkingSetAtHeight(ctx context.Context, height uint64) (protocol.StateManager, error) {
return sdb.newWorkingSet(ctx, height+1)
// TODO: implement archive mode
return nil, ErrNotSupported
}

// PutBlock persists all changes in RunActions() into the DB
Expand Down
3 changes: 3 additions & 0 deletions state/factory/workingset.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ func (ws *workingSet) State(s interface{}, opts ...protocol.StateOption) (uint64
if err != nil {
return ws.height, err
}
if cfg.Keys != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this?

Copy link
Member Author

@dustinxie dustinxie Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's in the removed stateAtHeight() func, it's a safety check, there's a similar check in States() at L361

return 0, errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet")
}
value, err := ws.store.Get(cfg.Namespace, cfg.Key)
if err != nil {
return ws.height, err
Expand Down
4 changes: 3 additions & 1 deletion state/factory/workingset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func newFactoryWorkingSet(t testing.TB) *workingSet {
genesis.Default,
)
r.NoError(sf.Start(ctx))
// defer r.NoError(sf.Stop(ctx))
defer func() {
r.NoError(sf.Stop(ctx))
}()

ws, err := sf.(workingSetCreator).newWorkingSet(ctx, 1)
r.NoError(err)
Expand Down
Loading
Loading