-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[occ] Add basic worker task and scheduler shell #328
Changes from all commits
a6a9d4a
edf17c5
8ed5ba9
459ccd5
b06373e
244f21d
6f18c42
7260454
a4b4a7f
e8be7d5
37bb06e
588f1f6
aea0cf9
a16fbc2
c1a37b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
package tasks | ||
|
||
import ( | ||
sdk "github.com/cosmos/cosmos-sdk/types" | ||
"github.com/tendermint/tendermint/abci/types" | ||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
type status string | ||
|
||
const ( | ||
// statusPending tasks are ready for execution | ||
// all executing tasks are in pending state | ||
statusPending status = "pending" | ||
// statusExecuted tasks are ready for validation | ||
// these tasks did not abort during execution | ||
statusExecuted status = "executed" | ||
// statusAborted means the task has been aborted | ||
// these tasks transition to pending upon next execution | ||
statusAborted status = "aborted" | ||
// statusValidated means the task has been validated | ||
// tasks in this status can be reset if an earlier task fails validation | ||
statusValidated status = "validated" | ||
) | ||
|
||
type deliverTxTask struct { | ||
Status status | ||
Index int | ||
Incarnation int | ||
Request types.RequestDeliverTx | ||
Response *types.ResponseDeliverTx | ||
} | ||
|
||
// Scheduler processes tasks concurrently | ||
type Scheduler interface { | ||
ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error) | ||
} | ||
|
||
type scheduler struct { | ||
deliverTx func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx) | ||
workers int | ||
} | ||
|
||
// NewScheduler creates a new scheduler | ||
func NewScheduler(workers int, deliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx)) Scheduler { | ||
return &scheduler{ | ||
workers: workers, | ||
deliverTx: deliverTxFunc, | ||
} | ||
} | ||
|
||
func toTasks(reqs []types.RequestDeliverTx) []*deliverTxTask { | ||
res := make([]*deliverTxTask, 0, len(reqs)) | ||
for idx, r := range reqs { | ||
res = append(res, &deliverTxTask{ | ||
Request: r, | ||
Index: idx, | ||
Status: statusPending, | ||
}) | ||
} | ||
return res | ||
} | ||
|
||
func collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { | ||
res := make([]types.ResponseDeliverTx, 0, len(tasks)) | ||
for _, t := range tasks { | ||
res = append(res, *t.Response) | ||
} | ||
return res | ||
} | ||
|
||
func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error) { | ||
tasks := toTasks(reqs) | ||
toExecute := tasks | ||
for len(toExecute) > 0 { | ||
|
||
// execute sets statuses of tasks to either executed or aborted | ||
err := s.executeAll(ctx, toExecute) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// validate returns any that should be re-executed | ||
// note this processes ALL tasks, not just those recently executed | ||
toExecute, err = s.validateAll(ctx, tasks) | ||
if err != nil { | ||
return nil, err | ||
} | ||
for _, t := range toExecute { | ||
t.Incarnation++ | ||
t.Status = statusPending | ||
t.Response = nil | ||
//TODO: reset anything that needs resetting | ||
} | ||
} | ||
return collectResponses(tasks), nil | ||
} | ||
|
||
// TODO: validate each tasks | ||
// TODO: return list of tasks that are invalid | ||
func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*deliverTxTask, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this isnt actually doing any validation, should we rename this to something like collectTasksForValidation (since thats what it appears to be doing?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's going to be doing the validation, it's collecting tasks for re-execution |
||
var res []*deliverTxTask | ||
|
||
// find first non-validated entry | ||
var startIdx int | ||
for idx, t := range tasks { | ||
if t.Status != statusValidated { | ||
startIdx = idx | ||
break | ||
} | ||
} | ||
|
||
for i := startIdx; i < len(tasks); i++ { | ||
// any aborted tx is known to be suspect here | ||
if tasks[i].Status == statusAborted { | ||
res = append(res, tasks[i]) | ||
} else { | ||
//TODO: validate the tasks and add it if invalid | ||
//TODO: create and handle abort for validation | ||
tasks[i].Status = statusValidated | ||
} | ||
} | ||
return res, nil | ||
} | ||
|
||
// ExecuteAll executes all tasks concurrently | ||
// Tasks are updated with their status | ||
// TODO: retries on aborted tasks | ||
// TODO: error scenarios | ||
func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error { | ||
ch := make(chan *deliverTxTask, len(tasks)) | ||
grp, gCtx := errgroup.WithContext(ctx.Context()) | ||
|
||
// a workers value < 1 means no limit | ||
workers := s.workers | ||
if s.workers < 1 { | ||
workers = len(tasks) | ||
} | ||
|
||
for i := 0; i < workers; i++ { | ||
grp.Go(func() error { | ||
for { | ||
select { | ||
case <-gCtx.Done(): | ||
return gCtx.Err() | ||
case task, ok := <-ch: | ||
if !ok { | ||
return nil | ||
} | ||
//TODO: ensure version multi store is on context | ||
// buffered so it doesn't block on write | ||
// abortCh := make(chan occ.Abort, 1) | ||
|
||
//TODO: consume from abort in non-blocking way (give it a length) | ||
resp := s.deliverTx(ctx, task.Request) | ||
|
||
// close(abortCh) | ||
|
||
//if _, ok := <-abortCh; ok { | ||
// tasks.status = TaskStatusAborted | ||
// continue | ||
//} | ||
|
||
task.Status = statusExecuted | ||
task.Response = &resp | ||
} | ||
} | ||
}) | ||
} | ||
grp.Go(func() error { | ||
defer close(ch) | ||
for _, task := range tasks { | ||
select { | ||
case <-gCtx.Done(): | ||
return gCtx.Err() | ||
case ch <- task: | ||
} | ||
} | ||
return nil | ||
}) | ||
|
||
if err := grp.Wait(); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
package tasks | ||
|
||
import ( | ||
"context" | ||
sdk "github.com/cosmos/cosmos-sdk/types" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/tendermint/tendermint/abci/types" | ||
"testing" | ||
) | ||
|
||
type mockDeliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx | ||
|
||
func (f mockDeliverTxFunc) DeliverTx(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { | ||
return f(ctx, req) | ||
} | ||
|
||
func requestList(n int) []types.RequestDeliverTx { | ||
tasks := make([]types.RequestDeliverTx, n) | ||
for i := 0; i < n; i++ { | ||
tasks[i] = types.RequestDeliverTx{} | ||
} | ||
return tasks | ||
} | ||
|
||
func TestProcessAll(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
workers int | ||
requests []types.RequestDeliverTx | ||
deliverTxFunc mockDeliverTxFunc | ||
expectedErr error | ||
}{ | ||
{ | ||
name: "All tasks processed without aborts", | ||
workers: 2, | ||
requests: requestList(5), | ||
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { | ||
return types.ResponseDeliverTx{} | ||
}, | ||
expectedErr: nil, | ||
}, | ||
//TODO: Add more test cases | ||
} | ||
|
||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
s := NewScheduler(tt.workers, tt.deliverTxFunc.DeliverTx) | ||
ctx := sdk.Context{}.WithContext(context.Background()) | ||
|
||
res, err := s.ProcessAll(ctx, tt.requests) | ||
if err != tt.expectedErr { | ||
t.Errorf("Expected error %v, got %v", tt.expectedErr, err) | ||
} else { | ||
// response for each request exists | ||
assert.Len(t, res, len(tt.requests)) | ||
} | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,14 @@ | ||
package scheduler | ||
package occ | ||
|
||
import "errors" | ||
import ( | ||
"errors" | ||
) | ||
|
||
var ( | ||
ErrReadEstimate = errors.New("multiversion store value contains estimate, cannot read, aborting") | ||
) | ||
|
||
// define the return struct for abort due to conflict | ||
// Abort contains the information for a transaction's conflict | ||
type Abort struct { | ||
DependentTxIdx int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'm ok moving this to scheduler too if we want, otherwise if theres a concern of import cycle we can keep in types so it can be used by both |
||
Err error | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldnt these be
StatusPending
so theyre public?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at the moment tasks don't leak the scheduler so nothing else cares