-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[occ] Add basic worker task and scheduler shell #328
Conversation
types/occ/scheduler.go
Outdated
Incarnation int | ||
Request types.RequestDeliverTx | ||
Response *types.ResponseDeliverTx | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to store any data here about "dependent TX" for the ESTIMATE abort case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes once we do the estimate/validation pieces there are a few things we'll need to track
types/occ/scheduler.go
Outdated
case <-gCtx.Done(): | ||
return gCtx.Err() | ||
case task, ok := <-ch: | ||
if !ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we refactor this logic under case
to a function like receiveTask
?
types/occ/scheduler.go
Outdated
// TODO: error scenarios | ||
func (s *scheduler) executeAll(ctx sdk.Context, tasks []*Task) error { | ||
ch := make(chan *Task, len(tasks)) | ||
grp, gCtx := errgroup.WithContext(ctx.Context()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we create the context WithCancel
so that we can kill off the workers from the scheduler? we check the context.Done case below, but we would still need to indicate Done
for the context via Cancel right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With errgroups, one can just return an error like context timeout error, and all ctx.Done()s will cancel. If something outside a task needs to cancel, then yes we can wrap errgroup with a WithCancel context, but then it's still going to be the ctx.Done() that exits those routines.
types/occ/scheduler.go
Outdated
return nil | ||
} | ||
//TODO: ensure version multi store is on context | ||
//abortCh := make(chan Abort) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can wrap mvkv here prior to calling the delivertx
types/occ/scheduler.go
Outdated
} | ||
//TODO: ensure version multi store is on context | ||
//abortCh := make(chan Abort) | ||
resp := s.deliverTx(ctx, task.Request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldnt it be better here to call deliverTx
async, otherwise there would be a deadlock since we'd never get to the channel read below? and then below we have a select statement with abortChannel, deliverTx response, or context Canceled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I added a comment on this, but basically if the abort channel is buffered, writes won't block)
types/occ/scheduler.go
Outdated
|
||
// validate returns any that should be re-executed | ||
// note this processes ALL tasks, not just those recently executed | ||
toExecute, err = s.validateAll(ctx, tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we want validation to also be ongoing and not done after all executions are done, that way if we fail validation for tx X we can immediately re-execute instead of having waited for ALL the txs to have executed before realizing the validation failed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it clear that as soon as validation fails is when we know it should be immediately re-executed? (If we have to prioritize re-executions?) That wasn't yet clear to me because it seems like we have to do a full-validation at the end anyway.
TaskStatusValidated TaskStatus = "validated" | ||
// statusPending tasks are ready for execution | ||
// all executing tasks are in pending state | ||
statusPending status = "pending" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldnt these be StatusPending
so theyre public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at the moment tasks don't leak the scheduler so nothing else cares
func (s *scheduler) validateAll(ctx sdk.Context, tasks []*Task) ([]*Task, error) { | ||
var res []*Task | ||
for _, t := range tasks { | ||
func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*deliverTxTask, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isnt actually doing any validation, should we rename this to something like collectTasksForValidation (since thats what it appears to be doing?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's going to be doing the validation, it's collecting tasks for re-execution
) | ||
|
||
// Abort contains the information for a transaction's conflict | ||
type Abort struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm ok moving this to scheduler too if we want, otherwise if theres a concern of import cycle we can keep in types so it can be used by both store
and scheduler
## Describe your changes and provide context - Adds a basic scheduler shell (see TODOs) - Adds a basic task definition with request/response/index - Listens to abort channel after an execution to determine conflict ## Testing performed to validate your change - Compiles (holding off until shape is validated) - Basic Unit Test for ProcessAll
## Describe your changes and provide context - Adds a basic scheduler shell (see TODOs) - Adds a basic task definition with request/response/index - Listens to abort channel after an execution to determine conflict ## Testing performed to validate your change - Compiles (holding off until shape is validated) - Basic Unit Test for ProcessAll
## Describe your changes and provide context - Adds a basic scheduler shell (see TODOs) - Adds a basic task definition with request/response/index - Listens to abort channel after an execution to determine conflict ## Testing performed to validate your change - Compiles (holding off until shape is validated) - Basic Unit Test for ProcessAll
## Describe your changes and provide context - Adds a basic scheduler shell (see TODOs) - Adds a basic task definition with request/response/index - Listens to abort channel after an execution to determine conflict ## Testing performed to validate your change - Compiles (holding off until shape is validated) - Basic Unit Test for ProcessAll
- Adds a basic scheduler shell (see TODOs) - Adds a basic task definition with request/response/index - Listens to abort channel after an execution to determine conflict - Compiles (holding off until shape is validated) - Basic Unit Test for ProcessAll
- Adds a basic scheduler shell (see TODOs) - Adds a basic task definition with request/response/index - Listens to abort channel after an execution to determine conflict - Compiles (holding off until shape is validated) - Basic Unit Test for ProcessAll
## Describe your changes and provide context - Adds a basic scheduler shell (see TODOs) - Adds a basic task definition with request/response/index - Listens to abort channel after an execution to determine conflict ## Testing performed to validate your change - Compiles (holding off until shape is validated) - Basic Unit Test for ProcessAll
## Describe your changes and provide context - Adds a basic scheduler shell (see TODOs) - Adds a basic task definition with request/response/index - Listens to abort channel after an execution to determine conflict ## Testing performed to validate your change - Compiles (holding off until shape is validated) - Basic Unit Test for ProcessAll
## Describe your changes and provide context - Adds a basic scheduler shell (see TODOs) - Adds a basic task definition with request/response/index - Listens to abort channel after an execution to determine conflict ## Testing performed to validate your change - Compiles (holding off until shape is validated) - Basic Unit Test for ProcessAll
- Adds a basic scheduler shell (see TODOs) - Adds a basic task definition with request/response/index - Listens to abort channel after an execution to determine conflict - Compiles (holding off until shape is validated) - Basic Unit Test for ProcessAll
## Describe your changes and provide context - Adds a basic scheduler shell (see TODOs) - Adds a basic task definition with request/response/index - Listens to abort channel after an execution to determine conflict ## Testing performed to validate your change - Compiles (holding off until shape is validated) - Basic Unit Test for ProcessAll
## Describe your changes and provide context - Adds a basic scheduler shell (see TODOs) - Adds a basic task definition with request/response/index - Listens to abort channel after an execution to determine conflict ## Testing performed to validate your change - Compiles (holding off until shape is validated) - Basic Unit Test for ProcessAll
Describe your changes and provide context
Testing performed to validate your change