Skip to content

Commit

Permalink
Refactor sync fallback to apropriately validate and re-excute as nece…
Browse files Browse the repository at this point in the history
…ssary (#461)

## Describe your changes and provide context
This refactors the sync fallback such that it will re-execute (and
revalidate validated to determine if they need execution) synchronously
after exceeding maxIterations

## Testing performed to validate your change
Unit test + need to perform loadtesting on main, some loadtesting
performed one the seiv2 version
  • Loading branch information
udpatil authored Mar 14, 2024
1 parent df756ef commit 0320b34
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 24 deletions.
2 changes: 2 additions & 0 deletions store/multiversion/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,11 @@ func (s *Store) checkReadsetAtIndex(index int) (bool, []int) {
if value != nil {
// conflict
// TODO: would we want to return early?
conflictSet[latestValue.Index()] = struct{}{}
valid = false
}
} else if !bytes.Equal(latestValue.Value(), value) {
conflictSet[latestValue.Index()] = struct{}{}
valid = false
}
}
Expand Down
14 changes: 7 additions & 7 deletions store/multiversion/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,20 @@ func TestMultiVersionStoreValidateState(t *testing.T) {
"key3": []byte("value6"),
})

// expect failure with empty conflicts
// expect failure with conflict of tx 2
valid, conflicts = mvs.ValidateTransactionState(5)
require.False(t, valid)
require.Empty(t, conflicts)
require.Equal(t, []int{2}, conflicts)

// add a conflict due to deletion
mvs.SetWriteset(3, 1, map[string][]byte{
"key1": nil,
})

// expect failure with empty conflicts
// expect failure with conflict of tx 2 and 3
valid, conflicts = mvs.ValidateTransactionState(5)
require.False(t, valid)
require.Empty(t, conflicts)
require.Equal(t, []int{2, 3}, conflicts)

// add a conflict due to estimate
mvs.SetEstimatedWriteset(4, 1, map[string][]byte{
Expand All @@ -230,7 +230,7 @@ func TestMultiVersionStoreValidateState(t *testing.T) {
// expect index 4 to be returned
valid, conflicts = mvs.ValidateTransactionState(5)
require.False(t, valid)
require.Equal(t, []int{4}, conflicts)
require.Equal(t, []int{2, 3, 4}, conflicts)
}

func TestMultiVersionStoreParentValidationMismatch(t *testing.T) {
Expand Down Expand Up @@ -436,10 +436,10 @@ func TestMVSIteratorValidationWithKeySwitch(t *testing.T) {
writeset2["key3"] = []byte("valueX")
mvs.SetWriteset(2, 2, writeset2)

// should be invalid
// should be invalid with conflict of 2
valid, conflicts := mvs.ValidateTransactionState(5)
require.False(t, valid)
require.Empty(t, conflicts)
require.Equal(t, []int{2}, conflicts)
}

func TestMVSIteratorValidationWithKeyAdded(t *testing.T) {
Expand Down
46 changes: 30 additions & 16 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ const (
statusValidated status = "validated"
// statusWaiting tasks are waiting for another tx to complete
statusWaiting status = "waiting"
// maximumIncarnation before we revert to sequential (for high conflict rates)
maximumIncarnation = 5
// maximumIterations before we revert to sequential (for high conflict rates)
maximumIterations = 10
)

type deliverTxTask struct {
Expand Down Expand Up @@ -261,6 +261,7 @@ func (s *scheduler) emitMetrics() {
}

func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) {
var iterations int
// initialize mutli-version stores if they haven't been initialized yet
s.tryInitMultiVersionStore(ctx)
// prefill estimates
Expand Down Expand Up @@ -289,34 +290,32 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
toExecute := tasks
for !allValidated(tasks) {
// if the max incarnation >= 5, we should revert to synchronous
if s.maxIncarnation >= maximumIncarnation {
if iterations >= maximumIterations {
// process synchronously
s.synchronous = true
// execute all non-validated tasks (no more "waiting" status)
toExecute = filterTasks(tasks, func(t *deliverTxTask) bool {
return !t.IsStatus(statusValidated)
})
startIdx, anyLeft := s.findFirstNonValidated()
if !anyLeft {
break
}
toExecute = tasks[startIdx:]
}

var err error

// execute sets statuses of tasks to either executed or aborted
if len(toExecute) > 0 {
err = s.executeAll(ctx, toExecute)
if err != nil {
return nil, err
}
if err := s.executeAll(ctx, toExecute); err != nil {
return nil, err
}

// validate returns any that should be re-executed
// note this processes ALL tasks, not just those recently executed
toExecute, err = s.validateAll(ctx, tasks)
toExecute, err := s.validateAll(ctx, tasks)
if err != nil {
return nil, err
}
// these are retries which apply to metrics
s.metrics.retries += len(toExecute)
iterations++
}

for _, mv := range s.multiVersionStores {
mv.WriteLatestToStore()
}
Expand Down Expand Up @@ -420,7 +419,11 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del

// ExecuteAll executes all tasks concurrently
func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
if len(tasks) == 0 {
return nil
}
ctx, span := s.traceSpan(ctx, "SchedulerExecuteAll", nil)
span.SetAttributes(attribute.Bool("synchronous", s.synchronous))
defer span.End()

// validationWg waits for all validations to complete
Expand Down Expand Up @@ -499,6 +502,17 @@ func (s *scheduler) executeTask(task *deliverTxTask) {
defer dSpan.End()
task.Ctx = dCtx

// in the synchronous case, we only want to re-execute tasks that need re-executing
// if already validated, then this does another validation
if s.synchronous && task.IsStatus(statusValidated) {
s.shouldRerun(task)
if task.IsStatus(statusValidated) {
return
}
task.Reset()
task.Increment()
}

s.prepareTask(task)

resp := s.deliverTx(task.Ctx, task.Request)
Expand All @@ -513,7 +527,7 @@ func (s *scheduler) executeTask(task *deliverTxTask) {
abort, ok := <-task.AbortCh
if ok {
// if there is an abort item that means we need to wait on the dependent tx
task.SetStatus(statusWaiting)
task.SetStatus(statusAborted)
task.Abort = &abort
task.AppendDependencies([]int{abort.DependentTxIdx})
}
Expand Down
39 changes: 38 additions & 1 deletion tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
_ "net/http/pprof"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -264,6 +266,41 @@ func TestProcessAll(t *testing.T) {
},
expectedErr: nil,
},
{
name: "Test every tx accesses same key with delays",
workers: 50,
runs: 1,
addStores: true,
requests: requestList(1000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) (response types.ResponseDeliverTx) {
defer abortRecoveryFunc(&response)
wait := rand.Intn(10)
time.Sleep(time.Duration(wait) * time.Millisecond)
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))
time.Sleep(time.Duration(wait) * time.Millisecond)
// write to the store with this tx's index
newVal := val + fmt.Sprintf("%d", ctx.TxIndex())
kv.Set(itemKey, []byte(newVal))

// return what was read from the store (final attempt should be index-1)
return types.ResponseDeliverTx{
Info: newVal,
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
expected := ""
for idx, response := range res {
expected = expected + fmt.Sprintf("%d", idx)
require.Equal(t, expected, response.Info)
}
// confirm last write made it to the parent store
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, expected, string(latest))
},
expectedErr: nil,
},
}

for _, tt := range tests {
Expand All @@ -285,7 +322,7 @@ func TestProcessAll(t *testing.T) {
}

res, err := s.ProcessAll(ctx, tt.requests)
require.LessOrEqual(t, s.(*scheduler).maxIncarnation, maximumIncarnation)
require.LessOrEqual(t, s.(*scheduler).maxIncarnation, maximumIterations)
require.Len(t, res, len(tt.requests))

if !errors.Is(err, tt.expectedErr) {
Expand Down

0 comments on commit 0320b34

Please sign in to comment.