Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[occ] Add basic worker task and scheduler shell #328

Merged
merged 15 commits into from
Oct 17, 2023
22 changes: 16 additions & 6 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/armon/go-metrics"
"github.com/cosmos/cosmos-sdk/types/occ"
"github.com/gogo/protobuf/proto"
abci "github.com/tendermint/tendermint/abci/types"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
Expand Down Expand Up @@ -237,13 +238,22 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc
// DeliverTxBatch executes multiple txs
// TODO: support occ logic with scheduling
func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) (res sdk.DeliverTxBatchResponse) {
// TODO: replace with actual scheduler logic
// This is stubbed so that it does something sensible
//TODO: inject multiversion store without import cycle (figure out right place for this)
// ctx = ctx.WithMultiVersionStore(multiversion.NewMultiVersionStore())

tasks := make([]*occ.Task, 0, len(req.TxEntries))
for idx, tx := range req.TxEntries {
tasks = append(tasks, occ.NewTask(tx.Request, idx))
}

scheduler := occ.NewScheduler(app.concurrencyWorkers, app.DeliverTx)
if err := scheduler.ProcessAll(ctx, tasks); err != nil {
//TODO: handle error
}

responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries))
for _, tx := range req.TxEntries {
responses = append(responses, &sdk.DeliverTxResult{
Response: app.DeliverTx(ctx, tx.Request),
})
for _, tx := range tasks {
responses = append(responses, &sdk.DeliverTxResult{Response: *tx.Response})
}
return sdk.DeliverTxBatchResponse{Results: responses}
}
Expand Down
155 changes: 152 additions & 3 deletions types/occ/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package scheduler
package occ

import "errors"
import (
"errors"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/tendermint/tendermint/abci/types"
"golang.org/x/sync/errgroup"
)

var (
ErrReadEstimate = errors.New("multiversion store value contains estimate, cannot read, aborting")
)

// define the return struct for abort due to conflict
// Abort contains the information for a transaction's conflict
type Abort struct {
DependentTxIdx int
Err error
Expand All @@ -18,3 +23,147 @@ func NewEstimateAbort(dependentTxIdx int) Abort {
Err: ErrReadEstimate,
}
}

type TaskStatus string

const (
TaskStatusPending TaskStatus = "pending"
TaskStatusExecuted TaskStatus = "executed"
TaskStatusAborted TaskStatus = "aborted"
TaskStatusValidated TaskStatus = "validated"
)

type Task struct {
Status TaskStatus
Index int
Incarnation int
Request types.RequestDeliverTx
Response *types.ResponseDeliverTx
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to store any data here about "dependent TX" for the ESTIMATE abort case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes once we do the estimate/validation pieces there are a few things we'll need to track

// TODO: (TBD) this might not be necessary to externalize, unless others
func NewTask(request types.RequestDeliverTx, index int) *Task {
return &Task{
Request: request,
Index: index,
Status: TaskStatusPending,
}
}

// Scheduler processes tasks concurrently
type Scheduler interface {
ProcessAll(ctx sdk.Context, tasks []*Task) error
}

type scheduler struct {
deliverTx func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx)
workers int
}

// NewScheduler creates a new scheduler
func NewScheduler(workers int, deliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx)) Scheduler {
return &scheduler{
workers: workers,
deliverTx: deliverTxFunc,
}
}

func (s *scheduler) ProcessAll(ctx sdk.Context, tasks []*Task) error {
toExecute := tasks
for len(toExecute) > 0 {

// execute sets statuses of tasks to either executed or aborted
err := s.executeAll(ctx, toExecute)
if err != nil {
return err
}

// validate returns any that should be re-executed
// note this processes ALL tasks, not just those recently executed
toExecute, err = s.validateAll(ctx, tasks)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want validation to also be ongoing and not done after all executions are done, that way if we fail validation for tx X we can immediately re-execute instead of having waited for ALL the txs to have executed before realizing the validation failed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it clear that as soon as validation fails is when we know it should be immediately re-executed? (If we have to prioritize re-executions?) That wasn't yet clear to me because it seems like we have to do a full-validation at the end anyway.

return err
}
for _, t := range toExecute {
t.Incarnation++
t.Status = TaskStatusPending
//TODO: reset anything that needs resetting
}
}
return nil
}

// TODO: validate each task
// TODO: return list of tasks that are invalid
func (s *scheduler) validateAll(ctx sdk.Context, tasks []*Task) ([]*Task, error) {
var res []*Task
for _, t := range tasks {
// any aborted tx is known to be suspect here
if t.Status == TaskStatusAborted {
res = append(res, t)
} else {
//TODO: validate the task and add it if invalid
//TODO: create and handle abort for validation
t.Status = TaskStatusValidated
}
}
return res, nil
}

// ExecuteAll executes all tasks concurrently
// Tasks are updated with their status
// TODO: retries on aborted tasks
// TODO: error scenarios
func (s *scheduler) executeAll(ctx sdk.Context, tasks []*Task) error {
ch := make(chan *Task, len(tasks))
grp, gCtx := errgroup.WithContext(ctx.Context())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we create the context WithCancel so that we can kill off the workers from the scheduler? we check the context.Done case below, but we would still need to indicate Done for the context via Cancel right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With errgroups, one can just return an error like context timeout error, and all ctx.Done()s will cancel. If something outside a task needs to cancel, then yes we can wrap errgroup with a WithCancel context, but then it's still going to be the ctx.Done() that exits those routines.

// a workers value < 1 means no limit
workers := s.workers
if s.workers < 1 {
workers = len(tasks)
}

for i := 0; i < workers; i++ {
grp.Go(func() error {
for {
select {
case <-gCtx.Done():
return gCtx.Err()
case task, ok := <-ch:
if !ok {
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we refactor this logic under case to a function like receiveTask?

}
//TODO: ensure version multi store is on context
//abortCh := make(chan Abort)
resp := s.deliverTx(ctx, task.Request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can wrap mvkv here prior to calling the delivertx


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldnt it be better here to call deliverTx async, otherwise there would be a deadlock since we'd never get to the channel read below? and then below we have a select statement with abortChannel, deliverTx response, or context Canceled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I added a comment on this, but basically if the abort channel is buffered, writes won't block)

//if _, ok := <-abortCh; ok {
// task.Status = TaskStatusAborted
// continue
//}

task.Status = TaskStatusExecuted
task.Response = &resp
}
}
})
}
grp.Go(func() error {
defer close(ch)
for _, task := range tasks {
select {
case <-gCtx.Done():
return gCtx.Err()
case ch <- task:
}
}
return nil
})

if err := grp.Wait(); err != nil {
return err
}

return nil
}
68 changes: 68 additions & 0 deletions types/occ/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package occ

import (
"context"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/tendermint/tendermint/abci/types"
"testing"
)

type mockDeliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx

func (f mockDeliverTxFunc) DeliverTx(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
return f(ctx, req)
}

func createTasks(n int) []*Task {
tasks := make([]*Task, n)
for i := 0; i < n; i++ {
tasks[i] = NewTask(types.RequestDeliverTx{}, i)
}
return tasks
}

func TestProcessAll(t *testing.T) {
tests := []struct {
name string
workers int
tasks []*Task
deliverTxFunc mockDeliverTxFunc
expectedStatus TaskStatus
expectedErr error
expectedInc int
}{
{
name: "All tasks processed without aborts",
workers: 2,
tasks: createTasks(5),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
return types.ResponseDeliverTx{}
},
expectedStatus: TaskStatusValidated,
expectedErr: nil,
expectedInc: 0,
},
//TODO: Add more test cases
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := NewScheduler(tt.workers, tt.deliverTxFunc.DeliverTx)
ctx := sdk.Context{}.WithContext(context.Background())

err := s.ProcessAll(ctx, tt.tasks)
if err != tt.expectedErr {
t.Errorf("Expected error %v, got %v", tt.expectedErr, err)
}

for _, task := range tt.tasks {
if task.Status != tt.expectedStatus {
t.Errorf("Expected task status to be %s, got %s", tt.expectedStatus, task.Status)
}
if task.Incarnation != tt.expectedInc {
t.Errorf("Expected task incarnation to be %d, got %d", tt.expectedInc, task.Incarnation)
}
}
})
}
}
Loading