Skip to content

Commit

Permalink
Delete records by batches
Browse files Browse the repository at this point in the history
  • Loading branch information
asdine committed Oct 21, 2019
1 parent 15cf09d commit 94d33fd
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 15 deletions.
48 changes: 41 additions & 7 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ func (stmt deleteStmt) IsReadOnly() bool {
return false
}

const bufferSize = 100

// Run deletes matching records by batches of bufferSize records.
// Some engines can't iterate while deleting keys (https://github.com/etcd-io/bbolt/issues/146)
// and some can't create more than one iterator per read-write transaction (https://github.com/dgraph-io/badger/issues/1093).
// To deal with these limitations, Run will iterate on a limited number of records, copy the keys
// to a buffer and delete them after the iteration is complete, and it will do that until there is no record
// left to delete.
// Increasing bufferSize will occasionate less key searches (O(log n) for most engines) but will take more memory.
func (stmt deleteStmt) Run(tx *Tx, args []driver.NamedValue) (Result, error) {
var res Result
if stmt.tableName == "" {
Expand All @@ -59,15 +68,40 @@ func (stmt deleteStmt) Run(tx *Tx, args []driver.NamedValue) (Result, error) {
}

st := record.NewStream(t)
st = st.Filter(whereClause(stmt.whereExpr, stack))
st = st.Filter(whereClause(stmt.whereExpr, stack)).Limit(bufferSize)

keys := make([][]byte, bufferSize)

for {
var i int

err = st.Iterate(func(r record.Record) error {
k, ok := r.(record.Keyer)
if !ok {
return errors.New("attempt to delete record without key")
}
// copy the key and reuse the buffer
keys[i] = append(keys[i][0:0], k.Key()...)
i++
return nil
})
if err != nil {
return res, err
}

keys = keys[:i]

err = st.Iterate(func(r record.Record) error {
if k, ok := r.(record.Keyer); ok {
return t.Delete(k.Key())
for _, key := range keys {
err = t.Delete(key)
if err != nil {
return res, err
}
}

return errors.New("attempt to delete record without key")
})
if i < bufferSize {
break
}
}

return res, err
return res, nil
}
9 changes: 3 additions & 6 deletions engine/badger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,9 @@ func (s *Store) AscendGreaterOrEqual(pivot []byte, fn func(k, v []byte) error) e
for it.Seek(seek); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()

v, err := item.ValueCopy(nil)
if err != nil {
return err
}

err = fn(bytes.TrimPrefix(item.KeyCopy(nil), prefix), v)
err := item.Value(func(v []byte) error {
return fn(bytes.TrimPrefix(item.Key(), prefix), v)
})
if err != nil {
return err
}
Expand Down
199 changes: 199 additions & 0 deletions engine/enginetest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"testing"

"github.com/asdine/genji"
"github.com/asdine/genji/engine"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -38,6 +39,8 @@ func TestSuite(t *testing.T, builder Builder) {
{"Store/Get", TestStoreGet},
{"Store/Delete", TestStoreDelete},
{"Store/Truncate", TestStoreTruncate},
{"TestQueries", TestQueries},
{"TestQueriesSameTransaction", TestQueriesSameTransaction},
}

for _, test := range tests {
Expand Down Expand Up @@ -837,3 +840,199 @@ func TestStoreTruncate(t *testing.T, builder Builder) {
require.NoError(t, err)
})
}

// TestQueries test simple queries against the engine.
func TestQueries(t *testing.T, builder Builder) {
t.Run("SELECT", func(t *testing.T) {
ng, cleanup := builder()
defer cleanup()

db, err := genji.New(ng)
require.NoError(t, err)
defer db.Close()

st, err := db.Query(`
CREATE TABLE test;
INSERT INTO test (a) VALUES (1), (2), (3), (4);
SELECT * FROM test;
`)
require.NoError(t, err)
defer st.Close()
n, err := st.Count()
require.NoError(t, err)
require.Equal(t, 4, n)
})

t.Run("INSERT", func(t *testing.T) {
ng, cleanup := builder()
defer cleanup()

db, err := genji.New(ng)
require.NoError(t, err)
defer db.Close()

err = db.Exec(`
CREATE TABLE test;
INSERT INTO test (a) VALUES (1), (2), (3), (4);
`)
require.NoError(t, err)
})

// t.Run("UPDATE", func(t *testing.T) {
// ng, cleanup := builder()
// defer cleanup()

// db, err := genji.New(ng)
// require.NoError(t, err)
// defer db.Close()

// st, err := db.Query(`
// CREATE TABLE test;
// INSERT INTO test (a) VALUES (1), (2), (3), (4);
// UPDATE test SET a = 5;
// SELECT * FROM test;
// `)
// require.NoError(t, err)
// defer st.Close()
// var buf bytes.Buffer
// err = recordutil.IteratorToJSON(&buf, st)
// require.NoError(t, err)
// require.Equal(t, `{"a":5}
// {"a":5}
// {"a":5}
// {"a":5}
// `, buf.String())
// })

t.Run("DELETE", func(t *testing.T) {
ng, cleanup := builder()
defer cleanup()

db, err := genji.New(ng)
require.NoError(t, err)
defer db.Close()

err = db.Exec("CREATE TABLE test")
require.NoError(t, err)

err = db.Update(func(tx *genji.Tx) error {
for i := 1; i < 200; i++ {
err = tx.Exec("INSERT INTO test (a) VALUES (?)", i)
require.NoError(t, err)
}
return nil
})
require.NoError(t, err)

st, err := db.Query(`
DELETE FROM test WHERE a > 2;
SELECT * FROM test;
`)
require.NoError(t, err)
defer st.Close()
n, err := st.Count()
require.NoError(t, err)
require.Equal(t, 2, n)
})
}

// TestQueriesSameTransaction test simple queries in the same transaction.
func TestQueriesSameTransaction(t *testing.T, builder Builder) {
t.Run("SELECT", func(t *testing.T) {
ng, cleanup := builder()
defer cleanup()

db, err := genji.New(ng)
require.NoError(t, err)
defer db.Close()

err = db.Update(func(tx *genji.Tx) error {
st, err := tx.Query(`
CREATE TABLE test;
INSERT INTO test (a) VALUES (1), (2), (3), (4);
SELECT * FROM test;
`)
require.NoError(t, err)
defer st.Close()
n, err := st.Count()
require.NoError(t, err)
require.Equal(t, 4, n)
return nil
})
require.NoError(t, err)
})

t.Run("INSERT", func(t *testing.T) {
ng, cleanup := builder()
defer cleanup()

db, err := genji.New(ng)
require.NoError(t, err)
defer db.Close()

err = db.Update(func(tx *genji.Tx) error {
err = tx.Exec(`
CREATE TABLE test;
INSERT INTO test (a) VALUES (1), (2), (3), (4);
`)
require.NoError(t, err)
return nil
})
require.NoError(t, err)
})

// t.Run("UPDATE", func(t *testing.T) {
// ng, cleanup := builder()
// defer cleanup()

// db, err := genji.New(ng)
// require.NoError(t, err)
// defer db.Close()

// err = db.Update(func(tx *genji.Tx) error {
// st, err := tx.Query(`
// CREATE TABLE test;
// INSERT INTO test (a) VALUES (1), (2), (3), (4);
// UPDATE test SET a = 5;
// SELECT * FROM test;
// `)
// require.NoError(t, err)
// defer st.Close()
// var buf bytes.Buffer
// err = recordutil.IteratorToJSON(&buf, st)
// require.NoError(t, err)
// require.Equal(t, `{"a":5}
// {"a":5}
// {"a":5}
// {"a":5}
// `, buf.String())
// return nil
// })
// require.NoError(t, err)
// })

t.Run("DELETE", func(t *testing.T) {
ng, cleanup := builder()
defer cleanup()

db, err := genji.New(ng)
require.NoError(t, err)
defer db.Close()

err = db.Update(func(tx *genji.Tx) error {
st, err := tx.Query(`
CREATE TABLE test;
INSERT INTO test (a) VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10);
DELETE FROM test WHERE a > 2;
SELECT * FROM test;
`)
require.NoError(t, err)
defer st.Close()
n, err := st.Count()
require.NoError(t, err)
require.Equal(t, 2, n)
return nil
})
require.NoError(t, err)
})
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ require (
github.com/google/btree v1.0.0
github.com/oklog/ulid v1.3.1
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.3.0
go.etcd.io/bbolt v1.3.3 // indirect
github.com/stretchr/testify v1.4.0
go.etcd.io/bbolt v1.3.3
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 // indirect
golang.org/x/tools v0.0.0-20190912185636-87d9f09c5d89
)
Expand Down
19 changes: 19 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkBy
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand All @@ -12,6 +18,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger v1.6.1-0.20191021140416-be6402f83d31 h1:Ika9aisruYTmu5ZZiJz3f/HR6QIh6npi+74R84zLdfg=
github.com/dgraph-io/badger v1.6.1-0.20191021140416-be6402f83d31/go.mod h1:cEjdIw+iaGXuQdsDymXPRcpp8yHXZ6PmwmDJajnVyJc=
github.com/dgraph-io/ristretto v0.0.0-20191010170704-2ba187ef9534 h1:9G6fVccQriMJu4nXwpwLDoy9y31t/KUSLAbPcoBgv+4=
github.com/dgraph-io/ristretto v0.0.0-20191010170704-2ba187ef9534/go.mod h1:edzKIzGvqUCMzhTVWbiTSe75zD9Xxq0GtSBtFmaUTZs=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand All @@ -21,6 +31,8 @@ github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHj
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
Expand All @@ -36,16 +48,21 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
Expand All @@ -68,4 +85,6 @@ golang.org/x/tools v0.0.0-20190912185636-87d9f09c5d89 h1:WiVZGyzQN7gPNLRkkpsNX3j
golang.org/x/tools v0.0.0-20190912185636-87d9f09c5d89/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

0 comments on commit 94d33fd

Please sign in to comment.