-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[occ] Add basic worker task and scheduler shell #328
Changes from 10 commits
a6a9d4a
edf17c5
8ed5ba9
459ccd5
b06373e
244f21d
6f18c42
7260454
a4b4a7f
e8be7d5
37bb06e
588f1f6
aea0cf9
a16fbc2
c1a37b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,17 @@ | ||
package scheduler | ||
package occ | ||
|
||
import "errors" | ||
import ( | ||
"errors" | ||
sdk "github.com/cosmos/cosmos-sdk/types" | ||
"github.com/tendermint/tendermint/abci/types" | ||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
var ( | ||
ErrReadEstimate = errors.New("multiversion store value contains estimate, cannot read, aborting") | ||
) | ||
|
||
// define the return struct for abort due to conflict | ||
// Abort contains the information for a transaction's conflict | ||
type Abort struct { | ||
DependentTxIdx int | ||
Err error | ||
|
@@ -18,3 +23,147 @@ func NewEstimateAbort(dependentTxIdx int) Abort { | |
Err: ErrReadEstimate, | ||
} | ||
} | ||
|
||
type TaskStatus string | ||
|
||
const ( | ||
TaskStatusPending TaskStatus = "pending" | ||
TaskStatusExecuted TaskStatus = "executed" | ||
TaskStatusAborted TaskStatus = "aborted" | ||
TaskStatusValidated TaskStatus = "validated" | ||
) | ||
|
||
type Task struct { | ||
Status TaskStatus | ||
Index int | ||
Incarnation int | ||
Request types.RequestDeliverTx | ||
Response *types.ResponseDeliverTx | ||
} | ||
|
||
// TODO: (TBD) this might not be necessary to externalize, unless others | ||
func NewTask(request types.RequestDeliverTx, index int) *Task { | ||
return &Task{ | ||
Request: request, | ||
Index: index, | ||
Status: TaskStatusPending, | ||
} | ||
} | ||
|
||
// Scheduler processes tasks concurrently | ||
type Scheduler interface { | ||
ProcessAll(ctx sdk.Context, tasks []*Task) error | ||
} | ||
|
||
type scheduler struct { | ||
deliverTx func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx) | ||
workers int | ||
} | ||
|
||
// NewScheduler creates a new scheduler | ||
func NewScheduler(workers int, deliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx)) Scheduler { | ||
return &scheduler{ | ||
workers: workers, | ||
deliverTx: deliverTxFunc, | ||
} | ||
} | ||
|
||
func (s *scheduler) ProcessAll(ctx sdk.Context, tasks []*Task) error { | ||
toExecute := tasks | ||
for len(toExecute) > 0 { | ||
|
||
// execute sets statuses of tasks to either executed or aborted | ||
err := s.executeAll(ctx, toExecute) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// validate returns any that should be re-executed | ||
// note this processes ALL tasks, not just those recently executed | ||
toExecute, err = s.validateAll(ctx, tasks) | ||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we want validation to also be ongoing and not done after all executions are done, that way if we fail validation for tx X we can immediately re-execute instead of having waited for ALL the txs to have executed before realizing the validation failed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it clear that as soon as validation fails is when we know it should be immediately re-executed? (If we have to prioritize re-executions?) That wasn't yet clear to me because it seems like we have to do a full-validation at the end anyway. |
||
return err | ||
} | ||
for _, t := range toExecute { | ||
t.Incarnation++ | ||
t.Status = TaskStatusPending | ||
//TODO: reset anything that needs resetting | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// TODO: validate each task | ||
// TODO: return list of tasks that are invalid | ||
func (s *scheduler) validateAll(ctx sdk.Context, tasks []*Task) ([]*Task, error) { | ||
var res []*Task | ||
for _, t := range tasks { | ||
// any aborted tx is known to be suspect here | ||
if t.Status == TaskStatusAborted { | ||
res = append(res, t) | ||
} else { | ||
//TODO: validate the task and add it if invalid | ||
//TODO: create and handle abort for validation | ||
t.Status = TaskStatusValidated | ||
} | ||
} | ||
return res, nil | ||
} | ||
|
||
// ExecuteAll executes all tasks concurrently | ||
// Tasks are updated with their status | ||
// TODO: retries on aborted tasks | ||
// TODO: error scenarios | ||
func (s *scheduler) executeAll(ctx sdk.Context, tasks []*Task) error { | ||
ch := make(chan *Task, len(tasks)) | ||
grp, gCtx := errgroup.WithContext(ctx.Context()) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't we create the context There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With errgroups, one can just return an error like context timeout error, and all ctx.Done()s will cancel. If something outside a task needs to cancel, then yes we can wrap errgroup with a WithCancel context, but then it's still going to be the ctx.Done() that exits those routines. |
||
// a workers value < 1 means no limit | ||
workers := s.workers | ||
if s.workers < 1 { | ||
workers = len(tasks) | ||
} | ||
|
||
for i := 0; i < workers; i++ { | ||
grp.Go(func() error { | ||
for { | ||
select { | ||
case <-gCtx.Done(): | ||
return gCtx.Err() | ||
case task, ok := <-ch: | ||
if !ok { | ||
return nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we refactor this logic under |
||
} | ||
//TODO: ensure version multi store is on context | ||
//abortCh := make(chan Abort) | ||
resp := s.deliverTx(ctx, task.Request) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can wrap mvkv here prior to calling the delivertx |
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wouldnt it be better here to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (I added a comment on this, but basically if the abort channel is buffered, writes won't block) |
||
//if _, ok := <-abortCh; ok { | ||
// task.Status = TaskStatusAborted | ||
// continue | ||
//} | ||
|
||
task.Status = TaskStatusExecuted | ||
task.Response = &resp | ||
} | ||
} | ||
}) | ||
} | ||
grp.Go(func() error { | ||
defer close(ch) | ||
for _, task := range tasks { | ||
select { | ||
case <-gCtx.Done(): | ||
return gCtx.Err() | ||
case ch <- task: | ||
} | ||
} | ||
return nil | ||
}) | ||
|
||
if err := grp.Wait(); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package occ | ||
|
||
import ( | ||
"context" | ||
sdk "github.com/cosmos/cosmos-sdk/types" | ||
"github.com/tendermint/tendermint/abci/types" | ||
"testing" | ||
) | ||
|
||
type mockDeliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx | ||
|
||
func (f mockDeliverTxFunc) DeliverTx(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { | ||
return f(ctx, req) | ||
} | ||
|
||
func createTasks(n int) []*Task { | ||
tasks := make([]*Task, n) | ||
for i := 0; i < n; i++ { | ||
tasks[i] = NewTask(types.RequestDeliverTx{}, i) | ||
} | ||
return tasks | ||
} | ||
|
||
func TestProcessAll(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
workers int | ||
tasks []*Task | ||
deliverTxFunc mockDeliverTxFunc | ||
expectedStatus TaskStatus | ||
expectedErr error | ||
expectedInc int | ||
}{ | ||
{ | ||
name: "All tasks processed without aborts", | ||
workers: 2, | ||
tasks: createTasks(5), | ||
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { | ||
return types.ResponseDeliverTx{} | ||
}, | ||
expectedStatus: TaskStatusValidated, | ||
expectedErr: nil, | ||
expectedInc: 0, | ||
}, | ||
//TODO: Add more test cases | ||
} | ||
|
||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
s := NewScheduler(tt.workers, tt.deliverTxFunc.DeliverTx) | ||
ctx := sdk.Context{}.WithContext(context.Background()) | ||
|
||
err := s.ProcessAll(ctx, tt.tasks) | ||
if err != tt.expectedErr { | ||
t.Errorf("Expected error %v, got %v", tt.expectedErr, err) | ||
} | ||
|
||
for _, task := range tt.tasks { | ||
if task.Status != tt.expectedStatus { | ||
t.Errorf("Expected task status to be %s, got %s", tt.expectedStatus, task.Status) | ||
} | ||
if task.Incarnation != tt.expectedInc { | ||
t.Errorf("Expected task incarnation to be %d, got %d", tt.expectedInc, task.Incarnation) | ||
} | ||
} | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to store any data here about "dependent TX" for the ESTIMATE abort case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes once we do the estimate/validation pieces there are a few things we'll need to track