Skip to content

Commit

Permalink
[rdbms] Detect if Scanner is stuck in a loop
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Mar 18, 2024
1 parent 60c7f65 commit 1bf07e0
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 27 deletions.
11 changes: 8 additions & 3 deletions lib/rdbms/primary_key/primary_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,19 @@ func (k *Keys) LoadValues(startingValues, endingValues []any) error {
return nil
}

func (k *Keys) UpdateStartingValue(keyName string, startingVal any) error {
// UpdateStartingValue sets the starting value for a primary key and returnes whether the starting value changed.
func (k *Keys) UpdateStartingValue(keyName string, startingVal any) (bool, error) {
idx := slices.IndexFunc(k.keys, func(x Key) bool { return x.Name == keyName })
if idx < 0 {
return fmt.Errorf("no key named %s", keyName)
return false, fmt.Errorf("no key named %s", keyName)
}

if equal(k.keys[idx].StartingValue, startingVal) {
return false, nil
}

k.keys[idx].StartingValue = startingVal
return nil
return true, nil
}

func (k *Keys) KeyNames() []string {
Expand Down
56 changes: 35 additions & 21 deletions lib/rdbms/primary_key/primary_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (

func TestNewKeys(t *testing.T) {
// ensure upsert doesn't mutate original arguments to `NewKeys`
{
keysArray := []Key{{Name: "foo", StartingValue: 20}, {Name: "bar"}}
keys := NewKeys(keysArray)
assert.NoError(t, keys.UpdateStartingValue("foo", "new starting value"))
assert.Equal(t, "foo", keys.keys[0].Name)
assert.Equal(t, "new starting value", keys.keys[0].StartingValue)
assert.Equal(t, "foo", keysArray[0].Name)
assert.Equal(t, 20, keysArray[0].StartingValue)
}
keysArray := []Key{{Name: "foo", StartingValue: 20}, {Name: "bar"}}
keys := NewKeys(keysArray)
changed, err := keys.UpdateStartingValue("foo", "new starting value")
assert.NoError(t, err)
assert.True(t, changed)
assert.Equal(t, "foo", keys.keys[0].Name)
assert.Equal(t, "new starting value", keys.keys[0].StartingValue)
assert.Equal(t, "foo", keysArray[0].Name)
assert.Equal(t, 20, keysArray[0].StartingValue)
}

func TestPrimaryKeys_LoadValues(t *testing.T) {
Expand Down Expand Up @@ -94,10 +94,11 @@ func TestPrimaryKeys_LoadValues(t *testing.T) {
func TestKeys_UpdateStartingValue(t *testing.T) {
type _tc struct {
name string
keys *Keys
keys []Key
keyName string
startingVal any

changed bool
expectedKeys []Key
expectedErr string
}
Expand All @@ -107,22 +108,33 @@ func TestKeys_UpdateStartingValue(t *testing.T) {
tcs := []_tc{
{
name: "Key doesn't exist",
keys: &Keys{
keys: []Key{
{Name: "Key1", StartingValue: "Start1", EndingValue: "End1"},
},
keys: []Key{
{Name: "Key1", StartingValue: "Start1", EndingValue: "End1"},
},
keyName: "Key2",
startingVal: startVal2,
expectedErr: "no key named Key2",
},
{
name: "Update existing key",
keys: &Keys{
keys: []Key{
{Name: "Key1", StartingValue: "Start1", EndingValue: "End1"},
},
name: "Update existing key with existing start value",
keys: []Key{
{Name: "Key1", StartingValue: "Start1", EndingValue: "End1"},
{Name: "Key2", StartingValue: 2, EndingValue: 2},
},
changed: false,
keyName: "Key1",
startingVal: "Start1",
expectedKeys: []Key{
{Name: "Key1", StartingValue: "Start1", EndingValue: "End1"},
{Name: "Key2", StartingValue: 2, EndingValue: 2},
},
},
{
name: "Update existing key with new value",
keys: []Key{
{Name: "Key1", StartingValue: "Start1", EndingValue: "End1"},
},
changed: true,
keyName: "Key1",
startingVal: startVal2,
expectedKeys: []Key{
Expand All @@ -132,12 +144,14 @@ func TestKeys_UpdateStartingValue(t *testing.T) {
}

for _, tc := range tcs {
err := tc.keys.UpdateStartingValue(tc.keyName, tc.startingVal)
keys := &Keys{tc.keys}
changed, err := keys.UpdateStartingValue(tc.keyName, tc.startingVal)
if tc.expectedErr != "" {
assert.ErrorContains(t, err, tc.expectedErr, tc.name)
} else {
assert.NoError(t, err)
assert.Equal(t, tc.expectedKeys, tc.keys.keys, tc.name)
assert.Equal(t, tc.expectedKeys, keys.keys, tc.name)
assert.Equal(t, tc.changed, changed)
}
}
}
Expand Down
17 changes: 14 additions & 3 deletions lib/rdbms/scan/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,31 @@ func (s *Scanner) Next() ([]map[string]any, error) {
return nil, err
}

s.isFirstBatch = false

if len(rows) == 0 || s.primaryKeys.IsExhausted() {
s.done = true
} else {
var startingValuesChanged bool

// Update the starting keys so that the next scan will pick off where we last left off.
lastRow := rows[len(rows)-1]
for _, pk := range s.primaryKeys.Keys() {
if err := s.primaryKeys.UpdateStartingValue(pk.Name, lastRow[pk.Name]); err != nil {
changed, err := s.primaryKeys.UpdateStartingValue(pk.Name, lastRow[pk.Name])
if err != nil {
s.done = true
return nil, err
}
startingValuesChanged = startingValuesChanged || changed
}

if !s.isFirstBatch && !startingValuesChanged {
// Detect sitiations where the scanner is stuck in a loop.
// The second batch will use a > comparision instead of a > comparision for the lower bound.
return nil, fmt.Errorf("primarky key start values did not change, scanner is stuck in a loop")
}
}

s.isFirstBatch = false

return rows, nil
}

Expand Down

0 comments on commit 1bf07e0

Please sign in to comment.