Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[occ] Add basic worker task and scheduler shell #328

Merged
merged 15 commits into from
Oct 17, 2023
25 changes: 18 additions & 7 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/cosmos/cosmos-sdk/tasks"
"os"
"sort"
"strings"
Expand Down Expand Up @@ -237,13 +238,23 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc
// DeliverTxBatch executes multiple txs
// TODO: support occ logic with scheduling
func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) (res sdk.DeliverTxBatchResponse) {
// TODO: replace with actual scheduler logic
// This is stubbed so that it does something sensible
responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries))
//TODO: inject multiversion store without import cycle (figure out right place for this)
// ctx = ctx.WithMultiVersionStore(multiversion.NewMultiVersionStore())

reqList := make([]abci.RequestDeliverTx, 0, len(req.TxEntries))
for _, tx := range req.TxEntries {
responses = append(responses, &sdk.DeliverTxResult{
Response: app.DeliverTx(ctx, tx.Request),
})
reqList = append(reqList, tx.Request)
}

scheduler := tasks.NewScheduler(app.concurrencyWorkers, app.DeliverTx)
txRes, err := scheduler.ProcessAll(ctx, reqList)
if err != nil {
//TODO: handle error
}

responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries))
for _, tx := range txRes {
responses = append(responses, &sdk.DeliverTxResult{Response: tx})
}
return sdk.DeliverTxBatchResponse{Results: responses}
}
Expand All @@ -254,7 +265,7 @@ func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchReques
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
// gas execution context.
// TODO: (occ) this is the function called from sei-chain to perform execution of a transaction.
// We'd likely replace this with an execution task that is scheduled by the OCC scheduler
// We'd likely replace this with an execution tasks that is scheduled by the OCC scheduler
func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")
defer func() {
Expand Down
187 changes: 187 additions & 0 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package tasks

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/tendermint/tendermint/abci/types"
"golang.org/x/sync/errgroup"
)

type status string

const (
// statusPending tasks are ready for execution
// all executing tasks are in pending state
statusPending status = "pending"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt these be StatusPending so theyre public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the moment tasks don't leak the scheduler so nothing else cares

// statusExecuted tasks are ready for validation
// these tasks did not abort during execution
statusExecuted status = "executed"
// statusAborted means the task has been aborted
// these tasks transition to pending upon next execution
statusAborted status = "aborted"
// statusValidated means the task has been validated
// tasks in this status can be reset if an earlier task fails validation
statusValidated status = "validated"
)

type deliverTxTask struct {
Status status
Index int
Incarnation int
Request types.RequestDeliverTx
Response *types.ResponseDeliverTx
}

// Scheduler processes tasks concurrently
type Scheduler interface {
ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error)
}

type scheduler struct {
deliverTx func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx)
workers int
}

// NewScheduler creates a new scheduler
func NewScheduler(workers int, deliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx)) Scheduler {
return &scheduler{
workers: workers,
deliverTx: deliverTxFunc,
}
}

func toTasks(reqs []types.RequestDeliverTx) []*deliverTxTask {
res := make([]*deliverTxTask, 0, len(reqs))
for idx, r := range reqs {
res = append(res, &deliverTxTask{
Request: r,
Index: idx,
Status: statusPending,
})
}
return res
}

func collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx {
res := make([]types.ResponseDeliverTx, 0, len(tasks))
for _, t := range tasks {
res = append(res, *t.Response)
}
return res
}

func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error) {
tasks := toTasks(reqs)
toExecute := tasks
for len(toExecute) > 0 {

// execute sets statuses of tasks to either executed or aborted
err := s.executeAll(ctx, toExecute)
if err != nil {
return nil, err
}

// validate returns any that should be re-executed
// note this processes ALL tasks, not just those recently executed
toExecute, err = s.validateAll(ctx, tasks)
if err != nil {
return nil, err
}
for _, t := range toExecute {
t.Incarnation++
t.Status = statusPending
t.Response = nil
//TODO: reset anything that needs resetting
}
}
return collectResponses(tasks), nil
}

// TODO: validate each tasks
// TODO: return list of tasks that are invalid
func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*deliverTxTask, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isnt actually doing any validation, should we rename this to something like collectTasksForValidation (since thats what it appears to be doing?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's going to be doing the validation, it's collecting tasks for re-execution

var res []*deliverTxTask

// find first non-validated entry
var startIdx int
for idx, t := range tasks {
if t.Status != statusValidated {
startIdx = idx
break
}
}

for i := startIdx; i < len(tasks); i++ {
// any aborted tx is known to be suspect here
if tasks[i].Status == statusAborted {
res = append(res, tasks[i])
} else {
//TODO: validate the tasks and add it if invalid
//TODO: create and handle abort for validation
tasks[i].Status = statusValidated
}
}
return res, nil
}

// ExecuteAll executes all tasks concurrently
// Tasks are updated with their status
// TODO: retries on aborted tasks
// TODO: error scenarios
func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
ch := make(chan *deliverTxTask, len(tasks))
grp, gCtx := errgroup.WithContext(ctx.Context())

// a workers value < 1 means no limit
workers := s.workers
if s.workers < 1 {
workers = len(tasks)
}

for i := 0; i < workers; i++ {
grp.Go(func() error {
for {
select {
case <-gCtx.Done():
return gCtx.Err()
case task, ok := <-ch:
if !ok {
return nil
}
//TODO: ensure version multi store is on context
// buffered so it doesn't block on write
// abortCh := make(chan occ.Abort, 1)

//TODO: consume from abort in non-blocking way (give it a length)
resp := s.deliverTx(ctx, task.Request)

// close(abortCh)

//if _, ok := <-abortCh; ok {
// tasks.status = TaskStatusAborted
// continue
//}

task.Status = statusExecuted
task.Response = &resp
}
}
})
}
grp.Go(func() error {
defer close(ch)
for _, task := range tasks {
select {
case <-gCtx.Done():
return gCtx.Err()
case ch <- task:
}
}
return nil
})

if err := grp.Wait(); err != nil {
return err
}

return nil
}
59 changes: 59 additions & 0 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package tasks

import (
"context"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/abci/types"
"testing"
)

type mockDeliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx

func (f mockDeliverTxFunc) DeliverTx(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
return f(ctx, req)
}

func requestList(n int) []types.RequestDeliverTx {
tasks := make([]types.RequestDeliverTx, n)
for i := 0; i < n; i++ {
tasks[i] = types.RequestDeliverTx{}
}
return tasks
}

func TestProcessAll(t *testing.T) {
tests := []struct {
name string
workers int
requests []types.RequestDeliverTx
deliverTxFunc mockDeliverTxFunc
expectedErr error
}{
{
name: "All tasks processed without aborts",
workers: 2,
requests: requestList(5),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
return types.ResponseDeliverTx{}
},
expectedErr: nil,
},
//TODO: Add more test cases
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := NewScheduler(tt.workers, tt.deliverTxFunc.DeliverTx)
ctx := sdk.Context{}.WithContext(context.Background())

res, err := s.ProcessAll(ctx, tt.requests)
if err != tt.expectedErr {
t.Errorf("Expected error %v, got %v", tt.expectedErr, err)
} else {
// response for each request exists
assert.Len(t, res, len(tt.requests))
}
})
}
}
8 changes: 5 additions & 3 deletions types/occ/scheduler.go → types/occ/types.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package scheduler
package occ

import "errors"
import (
"errors"
)

var (
ErrReadEstimate = errors.New("multiversion store value contains estimate, cannot read, aborting")
)

// define the return struct for abort due to conflict
// Abort contains the information for a transaction's conflict
type Abort struct {
DependentTxIdx int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm ok moving this to scheduler too if we want, otherwise if theres a concern of import cycle we can keep in types so it can be used by both store and scheduler

Err error
Expand Down
Loading