Skip to content

Commit

Permalink
fix(eds/blockstore): Puts on EDSStore Blockstore (celestiaorg#2532)
Browse files Browse the repository at this point in the history
Closes celestiaorg#2424 

Alternative Design Discussions:
- Using in-memory instead of in on disk: During an unexpected shutdown
or in the case of AsyncGetter, cleanup would not occur and blocks would
be stuck in the store indefinitely. Using an in-memory blockstore would
fix this issue upon restarts
- Using a local Blockgetter to pass to NewErrByzantine: Instead of
putting the blocks into the EDS blockstore, the retrieval session could
make an in-memory blockstore on the fly to hand to NewErrByzantine when
needed. This would look cleaner in the code/make sense architecturally,
but it would mean full nodes cannot share these shares with each other
during reconstruction.
  • Loading branch information
distractedm1nd authored Aug 28, 2023
1 parent 4b96022 commit 196e849
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 28 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ require (
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect
github.com/ipfs/go-ipfs-exchange-offline v0.3.0 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
Expand Down
151 changes: 151 additions & 0 deletions nodebuilder/tests/reconstruct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -81,6 +82,156 @@ func TestFullReconstructFromBridge(t *testing.T) {
require.NoError(t, errg.Wait())
}

/*
Test-Case: Full Node reconstructs blocks from each other, after unsuccessfully syncing the complete
block from LN subnetworks. Analog to TestShareAvailable_DisconnectedFullNodes.
*/
func TestFullReconstructFromFulls(t *testing.T) {
light.DefaultSampleAmount = 10 // s
const (
blocks = 10
btime = time.Millisecond * 300
bsize = 8 // k
lnodes = 12 // c - total number of nodes on two subnetworks
)

ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout)
t.Cleanup(cancel)

sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime))
fillDn := swamp.FillBlocks(ctx, sw.ClientContext, sw.Accounts, bsize, blocks)

const defaultTimeInterval = time.Second * 5
bridge := sw.NewBridgeNode()

sw.SetBootstrapper(t, bridge)
require.NoError(t, bridge.Start(ctx))

// TODO: This is required to avoid flakes coming from unfinished retry
// mechanism for the same peer in go-header
_, err := bridge.HeaderServ.WaitForHeight(ctx, uint64(blocks))
require.NoError(t, err)

lights1 := make([]*nodebuilder.Node, lnodes/2)
lights2 := make([]*nodebuilder.Node, lnodes/2)
subs := make([]event.Subscription, lnodes)
errg, errCtx := errgroup.WithContext(ctx)
for i := 0; i < lnodes/2; i++ {
i := i
errg.Go(func() error {
lnConfig := nodebuilder.DefaultConfig(node.Light)
setTimeInterval(lnConfig, defaultTimeInterval)
light := sw.NewNodeWithConfig(node.Light, lnConfig)
sub, err := light.Host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
if err != nil {
return err
}
subs[i] = sub
lights1[i] = light
return light.Start(errCtx)
})
errg.Go(func() error {
lnConfig := nodebuilder.DefaultConfig(node.Light)
setTimeInterval(lnConfig, defaultTimeInterval)
light := sw.NewNodeWithConfig(node.Light, lnConfig)
sub, err := light.Host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
if err != nil {
return err
}
subs[(lnodes/2)+i] = sub
lights2[i] = light
return light.Start(errCtx)
})
}

require.NoError(t, errg.Wait())

for i := 0; i < lnodes; i++ {
select {
case <-ctx.Done():
t.Fatal("peer was not found")
case <-subs[i].Out():
require.NoError(t, subs[i].Close())
continue
}
}

// Remove bootstrappers to prevent FNs from connecting to bridge
sw.Bootstrappers = []ma.Multiaddr{}
// Use light nodes from respective subnetworks as bootstrappers to prevent connection to bridge
lnBootstrapper1, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(lights1[0].Host))
require.NoError(t, err)
lnBootstrapper2, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(lights2[0].Host))
require.NoError(t, err)

cfg := nodebuilder.DefaultConfig(node.Full)
setTimeInterval(cfg, defaultTimeInterval)
cfg.Share.UseShareExchange = false
cfg.Share.Discovery.PeersLimit = 0
cfg.Header.TrustedPeers = []string{lnBootstrapper1[0].String()}
full1 := sw.NewNodeWithConfig(node.Full, cfg)
cfg.Header.TrustedPeers = []string{lnBootstrapper2[0].String()}
full2 := sw.NewNodeWithConfig(node.Full, cfg)
require.NoError(t, full1.Start(ctx))
require.NoError(t, full2.Start(ctx))

// Form topology
for i := 0; i < lnodes/2; i++ {
// Separate light nodes into two subnetworks
for j := 0; j < lnodes/2; j++ {
sw.Disconnect(t, lights1[i], lights2[j])
if i != j {
sw.Connect(t, lights1[i], lights1[j])
sw.Connect(t, lights2[i], lights2[j])
}
}

sw.Connect(t, full1, lights1[i])
sw.Disconnect(t, full1, lights2[i])

sw.Connect(t, full2, lights2[i])
sw.Disconnect(t, full2, lights1[i])
}

// Ensure the fulls are not connected to the bridge
sw.Disconnect(t, full1, full2)
sw.Disconnect(t, full1, bridge)
sw.Disconnect(t, full2, bridge)

h, err := full1.HeaderServ.WaitForHeight(ctx, uint64(10+blocks-1))
require.NoError(t, err)

// Ensure that the full nodes cannot reconstruct before being connected to each other
ctxErr, cancelErr := context.WithTimeout(ctx, time.Second*30)
errg, errCtx = errgroup.WithContext(ctxErr)
errg.Go(func() error {
return full1.ShareServ.SharesAvailable(errCtx, h.DAH)
})
errg.Go(func() error {
return full2.ShareServ.SharesAvailable(errCtx, h.DAH)
})
require.Error(t, errg.Wait())
cancelErr()

// Reconnect FNs
sw.Connect(t, full1, full2)

errg, bctx := errgroup.WithContext(ctx)
for i := 10; i < blocks+11; i++ {
h, err := full1.HeaderServ.WaitForHeight(bctx, uint64(i))
require.NoError(t, err)
errg.Go(func() error {
return full1.ShareServ.SharesAvailable(bctx, h.DAH)
})
errg.Go(func() error {
return full2.ShareServ.SharesAvailable(bctx, h.DAH)
})
}

require.NoError(t, <-fillDn)
require.NoError(t, errg.Wait())
}

/*
Test-Case: Full Node reconstructs blocks only from Light Nodes
Pre-Reqs:
Expand Down
77 changes: 53 additions & 24 deletions share/eds/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
dshelp "github.com/ipfs/go-ipfs-ds-help"
ipld "github.com/ipfs/go-ipld-format"
)

var _ bstore.Blockstore = (*blockstore)(nil)

var (
blockstoreCacheKey = datastore.NewKey("bs-cache")
errUnsupportedOperation = errors.New("unsupported operation")
)

Expand All @@ -30,29 +33,42 @@ var (
type blockstore struct {
store *Store
cache *blockstoreCache
ds datastore.Batching
}

func newBlockstore(store *Store, cache *blockstoreCache) *blockstore {
func newBlockstore(store *Store, cache *blockstoreCache, ds datastore.Batching) *blockstore {
return &blockstore{
store: store,
cache: cache,
ds: namespace.Wrap(ds, blockstoreCacheKey),
}
}

func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, ErrNotFound) {
return false, nil
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
// key wasn't found in top level blockstore, but could be in datastore while being reconstructed
dsHas, dsErr := bs.ds.Has(ctx, dshelp.MultihashToDsKey(cid.Hash()))
if dsErr != nil {
return false, nil
}
return dsHas, nil
}
if err != nil {
return false, fmt.Errorf("failed to find shards containing multihash: %w", err)
return false, err
}

return len(keys) > 0, nil
}

func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
blockstr, err := bs.getReadOnlyBlockstore(ctx, cid)
if errors.Is(err, ErrNotFound) {
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
k := dshelp.MultihashToDsKey(cid.Hash())
blockData, err := bs.ds.Get(ctx, k)
if err == nil {
return blocks.NewBlockWithCid(blockData, cid)
}
// nmt's GetNode expects an ipld.ErrNotFound when a cid is not found.
return nil, ipld.ErrNotFound{Cid: cid}
}
Expand All @@ -65,7 +81,12 @@ func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error

func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
blockstr, err := bs.getReadOnlyBlockstore(ctx, cid)
if errors.Is(err, ErrNotFound) {
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
k := dshelp.MultihashToDsKey(cid.Hash())
size, err := bs.ds.GetSize(ctx, k)
if err == nil {
return size, nil
}
// nmt's GetSize expects an ipld.ErrNotFound when a cid is not found.
return 0, ipld.ErrNotFound{Cid: cid}
}
Expand All @@ -75,27 +96,35 @@ func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
return blockstr.GetSize(ctx, cid)
}

// DeleteBlock is a noop on the EDS blockstore that returns an errUnsupportedOperation when called.
func (bs *blockstore) DeleteBlock(context.Context, cid.Cid) error {
return errUnsupportedOperation
func (bs *blockstore) DeleteBlock(ctx context.Context, cid cid.Cid) error {
k := dshelp.MultihashToDsKey(cid.Hash())
return bs.ds.Delete(ctx, k)
}

// Put is a noop on the EDS blockstore, but it does not return an error because it is called by
// bitswap. For clarification, an implementation of Put does not make sense in this context because
// it is unclear which CAR file the block should be written to.
//
// TODO: throw errUnsupportedOperation after issue #1440
func (bs *blockstore) Put(context.Context, blocks.Block) error {
return nil
func (bs *blockstore) Put(ctx context.Context, blk blocks.Block) error {
k := dshelp.MultihashToDsKey(blk.Cid().Hash())
// note: we leave duplicate resolution to the underlying datastore
return bs.ds.Put(ctx, k, blk.RawData())
}

// PutMany is a noop on the EDS blockstore, but it does not return an error because it is called by
// bitswap. For clarification, an implementation of PutMany does not make sense in this context
// because it is unclear which CAR file the blocks should be written to.
//
// TODO: throw errUnsupportedOperation after issue #1440
func (bs *blockstore) PutMany(context.Context, []blocks.Block) error {
return nil
func (bs *blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
if len(blocks) == 1 {
// performance fast-path
return bs.Put(ctx, blocks[0])
}

t, err := bs.ds.Batch(ctx)
if err != nil {
return err
}
for _, b := range blocks {
k := dshelp.MultihashToDsKey(b.Cid().Hash())
err = t.Put(ctx, k, b.RawData())
if err != nil {
return err
}
}
return t.Commit(ctx)
}

// AllKeysChan is a noop on the EDS blockstore because the keys are not stored in a single CAR file.
Expand All @@ -112,7 +141,7 @@ func (bs *blockstore) HashOnRead(bool) {
// getReadOnlyBlockstore finds the underlying blockstore of the shard that contains the given CID.
func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (dagstore.ReadBlockstore, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, datastore.ErrNotFound) {
if errors.Is(err, datastore.ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
return nil, ErrNotFound
}
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion share/eds/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eds

import (
"context"
"errors"
"fmt"

"github.com/filecoin-project/dagstore/index"
Expand All @@ -14,6 +15,9 @@ import (

const invertedIndexPath = "/inverted_index/"

// ErrNotFoundInIndex is returned instead of ErrNotFound if the multihash doesn't exist in the index
var ErrNotFoundInIndex = fmt.Errorf("does not exist in index")

// simpleInvertedIndex is an inverted index that only stores a single shard key per multihash. Its
// implementation is modified from the default upstream implementation in dagstore/index.
type simpleInvertedIndex struct {
Expand Down Expand Up @@ -76,7 +80,7 @@ func (s *simpleInvertedIndex) GetShardsForMultihash(ctx context.Context, mh mult
key := ds.NewKey(string(mh))
sbz, err := s.ds.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to lookup index for mh %s, err: %w", mh, err)
return nil, errors.Join(ErrNotFoundInIndex, err)
}

return []shard.Key{shard.KeyFromString(string(sbz))}, nil
Expand Down
2 changes: 2 additions & 0 deletions share/eds/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type retrievalSession struct {
// newSession creates a new retrieval session and kicks off requesting process.
func (r *Retriever) newSession(ctx context.Context, dah *da.DataAvailabilityHeader) (*retrievalSession, error) {
size := len(dah.RowRoots)

treeFn := func(_ rsmt2d.Axis, index uint) rsmt2d.Tree {
// use proofs adder if provided, to cache collected proofs while recomputing the eds
var opts []nmt.Option
Expand Down Expand Up @@ -152,6 +153,7 @@ func (r *Retriever) newSession(ctx context.Context, dah *da.DataAvailabilityHead
for i := range ses.squareCellsLks {
ses.squareCellsLks[i] = make([]sync.Mutex, size)
}

go ses.request(ctx)
return ses, nil
}
Expand Down
2 changes: 1 addition & 1 deletion share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
mounts: r,
cache: cache,
}
store.bs = newBlockstore(store, cache)
store.bs = newBlockstore(store, cache, ds)
return store, nil
}

Expand Down
9 changes: 8 additions & 1 deletion share/getters/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func TestIPLDGetter(t *testing.T) {
err = edsStore.Start(ctx)
require.NoError(t, err)

bserv := bsrv.New(edsStore.Blockstore(), offline.Exchange(edsStore.Blockstore()))
bStore := edsStore.Blockstore()
bserv := bsrv.New(bStore, offline.Exchange(bStore))
sg := NewIPLDGetter(bserv)

t.Run("GetShare", func(t *testing.T) {
Expand Down Expand Up @@ -200,6 +201,12 @@ func TestIPLDGetter(t *testing.T) {
retrievedEDS, err := sg.GetEDS(ctx, &dah)
require.NoError(t, err)
assert.True(t, randEds.Equals(retrievedEDS))

// Ensure blocks still exist after cleanup
colRoots, _ := retrievedEDS.ColRoots()
has, err := bStore.Has(ctx, ipld.MustCidFromNamespacedSha256(colRoots[0]))
assert.NoError(t, err)
assert.True(t, has)
})

t.Run("GetSharesByNamespace", func(t *testing.T) {
Expand Down

0 comments on commit 196e849

Please sign in to comment.