Skip to content

Commit

Permalink
Cache indexes when getting a table
Browse files Browse the repository at this point in the history
  • Loading branch information
asdine committed Oct 21, 2019
1 parent 94d33fd commit dd30b92
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 160 deletions.
80 changes: 39 additions & 41 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,7 @@ func (tx Tx) CreateTable(name string) (*Table, error) {
return nil, errors.Wrapf(err, "failed to create table %q", name)
}

s, err := tx.tx.Store(name)
return &Table{
tx: &tx,
store: s,
name: name,
}, nil
return tx.GetTable(name)
}

// GetTable returns a table by name. The table instance is only valid for the lifetime of the transaction.
Expand All @@ -257,11 +252,18 @@ func (tx Tx) GetTable(name string) (*Table, error) {
return nil, err
}

return &Table{
t := Table{
tx: &tx,
store: s,
name: name,
}, nil
}

t.indexes, err = t.Indexes()
if err != nil {
return nil, err
}

return &t, nil
}

// DropTable deletes a table from the database.
Expand Down Expand Up @@ -390,9 +392,10 @@ func (tx Tx) DropIndex(name string) error {

// A Table represents a collection of records.
type Table struct {
tx *Tx
store engine.Store
name string
tx *Tx
store engine.Store
name string
indexes map[string]Index
}

type encodedRecordWithKey struct {
Expand Down Expand Up @@ -481,12 +484,7 @@ func (t Table) Insert(r record.Record) ([]byte, error) {
return nil, err
}

indexes, err := t.Indexes()
if err != nil {
return nil, err
}

for _, idx := range indexes {
for _, idx := range t.indexes {
f, err := r.GetField(idx.FieldName)
if err != nil {
continue
Expand All @@ -508,27 +506,24 @@ func (t Table) Insert(r record.Record) ([]byte, error) {
// Delete a record by key.
// Indexes are automatically updated.
func (t Table) Delete(key []byte) error {
err := t.store.Delete(key)
r, err := t.GetRecord(key)
if err != nil {
if err == engine.ErrKeyNotFound {
return ErrRecordNotFound
}
return err
}

indexes, err := t.Indexes()
if err != nil {
return err
}
for _, idx := range t.indexes {
f, err := r.GetField(idx.FieldName)
if err != nil {
return err
}

for _, idx := range indexes {
err = idx.Delete(key)
err = idx.Delete(f.Data, key)
if err != nil {
return err
}
}

return nil
return t.store.Delete(key)
}

type pkWrapper struct {
Expand All @@ -545,23 +540,19 @@ func (p pkWrapper) PrimaryKey() ([]byte, error) {
// Indexes are automatically updated.
func (t Table) Replace(key []byte, r record.Record) error {
// make sure key exists
_, err := t.store.Get(key)
old, err := t.GetRecord(key)
if err != nil {
if err == engine.ErrKeyNotFound {
return ErrRecordNotFound
}

return err
}

// remove key from indexes
indexes, err := t.Indexes()
if err != nil {
return err
}
for _, idx := range t.indexes {
f, err := old.GetField(idx.FieldName)
if err != nil {
return err
}

for _, idx := range indexes {
err = idx.Delete(key)
err = idx.Delete(f.Data, key)
if err != nil {
return err
}
Expand All @@ -580,7 +571,7 @@ func (t Table) Replace(key []byte, r record.Record) error {
}

// update indexes
for _, idx := range indexes {
for _, idx := range t.indexes {
f, err := r.GetField(idx.FieldName)
if err != nil {
continue
Expand All @@ -607,11 +598,17 @@ func (t Table) TableName() string {

// Indexes returns a map of all the indexes of a table.
func (t Table) Indexes() (map[string]Index, error) {
tb, err := t.tx.GetTable(indexTable)
s, err := t.tx.tx.Store(indexTable)
if err != nil {
return nil, err
}

tb := Table{
tx: t.tx,
store: s,
name: indexTable,
}

tableName := []byte(t.name)
indexes := make(map[string]Index)

Expand Down Expand Up @@ -653,6 +650,7 @@ func (t Table) Indexes() (map[string]Index, error) {
return nil, err
}

t.indexes = indexes
return indexes, nil
}

Expand Down
5 changes: 4 additions & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,15 @@ func TestTableInsert(t *testing.T) {
tx, cleanup := newTestDB(t)
defer cleanup()

tb, err := tx.CreateTable("test")
_, err := tx.CreateTable("test")
require.NoError(t, err)

idx, err := tx.CreateIndex("idxFoo", "test", "foo", index.Options{})
require.NoError(t, err)

tb, err := tx.GetTable("test")
require.NoError(t, err)

rec := newRecord()
foo := record.NewFloat32Field("foo", 10)
rec = append(rec, foo)
Expand Down
12 changes: 6 additions & 6 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ func (stmt deleteStmt) IsReadOnly() bool {
return false
}

const bufferSize = 100
const deleteBufferSize = 100

// Run deletes matching records by batches of bufferSize records.
// Run deletes matching records by batches of deleteBufferSize records.
// Some engines can't iterate while deleting keys (https://github.com/etcd-io/bbolt/issues/146)
// and some can't create more than one iterator per read-write transaction (https://github.com/dgraph-io/badger/issues/1093).
// To deal with these limitations, Run will iterate on a limited number of records, copy the keys
// to a buffer and delete them after the iteration is complete, and it will do that until there is no record
// left to delete.
// Increasing bufferSize will occasionate less key searches (O(log n) for most engines) but will take more memory.
// Increasing deleteBufferSize will occasionate less key searches (O(log n) for most engines) but will take more memory.
func (stmt deleteStmt) Run(tx *Tx, args []driver.NamedValue) (Result, error) {
var res Result
if stmt.tableName == "" {
Expand All @@ -68,9 +68,9 @@ func (stmt deleteStmt) Run(tx *Tx, args []driver.NamedValue) (Result, error) {
}

st := record.NewStream(t)
st = st.Filter(whereClause(stmt.whereExpr, stack)).Limit(bufferSize)
st = st.Filter(whereClause(stmt.whereExpr, stack)).Limit(deleteBufferSize)

keys := make([][]byte, bufferSize)
keys := make([][]byte, deleteBufferSize)

for {
var i int
Expand Down Expand Up @@ -98,7 +98,7 @@ func (stmt deleteStmt) Run(tx *Tx, args []driver.NamedValue) (Result, error) {
}
}

if i < bufferSize {
if i < deleteBufferSize {
break
}
}
Expand Down
110 changes: 56 additions & 54 deletions engine/enginetest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
package enginetest

import (
"bytes"
"errors"
"fmt"
"testing"

"github.com/asdine/genji"
"github.com/asdine/genji/engine"
"github.com/asdine/genji/record/recordutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -878,31 +880,31 @@ func TestQueries(t *testing.T, builder Builder) {
require.NoError(t, err)
})

// t.Run("UPDATE", func(t *testing.T) {
// ng, cleanup := builder()
// defer cleanup()

// db, err := genji.New(ng)
// require.NoError(t, err)
// defer db.Close()

// st, err := db.Query(`
// CREATE TABLE test;
// INSERT INTO test (a) VALUES (1), (2), (3), (4);
// UPDATE test SET a = 5;
// SELECT * FROM test;
// `)
// require.NoError(t, err)
// defer st.Close()
// var buf bytes.Buffer
// err = recordutil.IteratorToJSON(&buf, st)
// require.NoError(t, err)
// require.Equal(t, `{"a":5}
// {"a":5}
// {"a":5}
// {"a":5}
// `, buf.String())
// })
t.Run("UPDATE", func(t *testing.T) {
ng, cleanup := builder()
defer cleanup()

db, err := genji.New(ng)
require.NoError(t, err)
defer db.Close()

st, err := db.Query(`
CREATE TABLE test;
INSERT INTO test (a) VALUES (1), (2), (3), (4);
UPDATE test SET a = 5;
SELECT * FROM test;
`)
require.NoError(t, err)
defer st.Close()
var buf bytes.Buffer
err = recordutil.IteratorToJSON(&buf, st)
require.NoError(t, err)
require.Equal(t, `{"a":5}
{"a":5}
{"a":5}
{"a":5}
`, buf.String())
})

t.Run("DELETE", func(t *testing.T) {
ng, cleanup := builder()
Expand Down Expand Up @@ -981,35 +983,35 @@ func TestQueriesSameTransaction(t *testing.T, builder Builder) {
require.NoError(t, err)
})

// t.Run("UPDATE", func(t *testing.T) {
// ng, cleanup := builder()
// defer cleanup()

// db, err := genji.New(ng)
// require.NoError(t, err)
// defer db.Close()

// err = db.Update(func(tx *genji.Tx) error {
// st, err := tx.Query(`
// CREATE TABLE test;
// INSERT INTO test (a) VALUES (1), (2), (3), (4);
// UPDATE test SET a = 5;
// SELECT * FROM test;
// `)
// require.NoError(t, err)
// defer st.Close()
// var buf bytes.Buffer
// err = recordutil.IteratorToJSON(&buf, st)
// require.NoError(t, err)
// require.Equal(t, `{"a":5}
// {"a":5}
// {"a":5}
// {"a":5}
// `, buf.String())
// return nil
// })
// require.NoError(t, err)
// })
t.Run("UPDATE", func(t *testing.T) {
ng, cleanup := builder()
defer cleanup()

db, err := genji.New(ng)
require.NoError(t, err)
defer db.Close()

err = db.Update(func(tx *genji.Tx) error {
st, err := tx.Query(`
CREATE TABLE test;
INSERT INTO test (a) VALUES (1), (2), (3), (4);
UPDATE test SET a = 5;
SELECT * FROM test;
`)
require.NoError(t, err)
defer st.Close()
var buf bytes.Buffer
err = recordutil.IteratorToJSON(&buf, st)
require.NoError(t, err)
require.Equal(t, `{"a":5}
{"a":5}
{"a":5}
{"a":5}
`, buf.String())
return nil
})
require.NoError(t, err)
})

t.Run("DELETE", func(t *testing.T) {
ng, cleanup := builder()
Expand Down
Loading

0 comments on commit dd30b92

Please sign in to comment.