Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
implement batch delete (#12)
Browse files Browse the repository at this point in the history
* implement batch delete

* address code review and add a test
  • Loading branch information
whyrusleeping authored Jul 6, 2021
1 parent d0f4391 commit 9a9f76b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
36 changes: 36 additions & 0 deletions blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,42 @@ Retry:
return err
}

func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
b.oplock.RLock()
defer b.oplock.RUnlock()

Retry:
err := b.env.Update(func(txn *lmdb.Txn) error {
for _, c := range cids {
if err := txn.Del(b.db, c.Hash(), nil); err != nil {
return err
}
}
return nil
})
switch {
case err == nil || lmdb.IsNotFound(err): // shortcircuit happy path.
return nil
case lmdb.IsMapFull(err):
o := b.dedupGrow // take the deduplicator under the lock.
b.oplock.RUnlock() // drop the concurrent lock.
var err error
o.Do(func() { err = b.grow() })
if err != nil {
return fmt.Errorf("lmdb delete failed: %w", err)
}
b.oplock.RLock() // reclaim the concurrent lock.
goto Retry
case lmdb.IsErrno(err, lmdb.ReadersFull):
b.oplock.RUnlock() // yield.
b.sleep("delete many")
b.oplock.RLock()
goto Retry
}
return err

}

type cursor struct {
ctx context.Context
b *Blockstore
Expand Down
52 changes: 52 additions & 0 deletions blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,55 @@ func randomCID() cid.Cid {
mh, _ := multihash.Encode(b, multihash.SHA2_256)
return cid.NewCidV1(cid.Raw, mh)
}

type deleteManyer interface {
DeleteMany([]cid.Cid) error
}

func TestDeleteMany(t *testing.T) {
opts := Options{
InitialMmapSize: 1 << 10,
MmapGrowthStepFactor: 1.5,
MmapGrowthStepMax: 2 << 10,
}

bs, _ := newBlockstore(opts)(t)
defer bs.(io.Closer).Close()

var cids []cid.Cid
b := make([]byte, 1024)
for i := 0; i < 500; i++ {
rand.Read(b)
blk := blocks.NewBlock(b)
err := bs.Put(blk)
if err != nil {
fmt.Println(err)
}
require.NoError(t, err)
cids = append(cids, blk.Cid())
}

todelete := cids[:100]

dm := bs.(deleteManyer)

if err := dm.DeleteMany(todelete); err != nil {
t.Fatal(err)
}

deleted := make(map[cid.Cid]bool)
for _, d := range todelete {
deleted[d] = true
}

ch, err := bs.AllKeysChan(context.TODO())
if err != nil {
t.Fatal(err)
}

for c := range ch {
if deleted[c] {
t.Fatal("found cid in blockstore we deleted")
}
}
}

0 comments on commit 9a9f76b

Please sign in to comment.