Skip to content

Commit

Permalink
more discussion comments
Browse files Browse the repository at this point in the history
Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan committed Nov 25, 2024
1 parent dd58406 commit fea29eb
Showing 1 changed file with 16 additions and 13 deletions.
29 changes: 16 additions & 13 deletions rfc/system/RFC-5659-execution-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ my_lp = LaunchPlan.get_or_create(
We propose adding a new IDL message to capture concurrency behavior at CreateExecutionTime and embedding it in the existing [Schedule](https://github.com/flyteorg/flyte/blob/master/flyteidl/protos/flyteidl/admin/schedule.proto) message

```protobuf
message Concurrency {
message SchedulerPolicy {
// Defines how many executions with this launch plan can run in parallel
uint32 max = 1;
Expand All @@ -62,7 +62,7 @@ enum ConcurrencyPolicy {
message Schedule {
...
Concurrency concurrency = X;
SchedulerPolicy scheduler_policy = X;
}
// embedded in the ExecutionClosure
Expand All @@ -78,19 +78,22 @@ message ExecutionStateChangeDetails {
// Can also add to ExecutionSpec to specify execution time overrides
```
### Control Plane

### Concurrency Controller Singleton
At a broad level, we'll follow the precedent of the [scheduler](https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) defined in FlyteAdmin and define a singleton to manage concurrency across all launch plans.

1. At CreateExecution time, if the launch plan in the ExecutionSpec has a concurrency policy
1. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`.
1. or fail the request when the concurrency policy is set to `ABORT`
2. let the concurrency controller manage scheduling
1. Do not create the workflow CRD

### Concurrency Controller Singleton

Introduce the Concurrency Controller to poll for all pending executions:
1. Upon start-up, initialize a launch plan informer and a worker pool and spawn N number of worker threads.
1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (that is across versions) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.Schedule`
1. Periodically query the DB for pending executions `SELECT * FROM executions WHERE phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT') group by launch_plan_id;`
1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (that is across versions) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy`
1. Periodically query the DB for pending executions `SELECT * FROM executions WHERE phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT');`
1. For each `PENDING` execution returned by the above query, `Add()` the pending execution to a [workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go). We can fine tune in the future to include differentiated priority.
1. For each non-`PENDING` execution returned by the above query, update the map of active executions by launch plan named entity into a thread-safe Map of type `rawActiveLaunchPlanExecutions map[admin.NamedEntityIdentifier]util.Set[admin.Execution]` (e.g. using this [set]("k8s.io/apimachinery/pkg/util/sets") library)
1. After processing the complete set of non-terminal executions, transform the `rawActiveLaunchPlanExecutions` map into a thread-safe, ordered list of executions by creation time: `activeLaunchPlanExecutions map[admin.NamedEntityIdentifier][]*core.WorkflowExecutionIdentifier` using an implementation where different keys can be accessed concurrently.
Expand All @@ -115,17 +118,17 @@ Creating an execution
#### Launch Plan informer
This is an async process we run in the Concurrency Controller to ensure we have an eventually consistent view of launch plans.

Upon Concurrency Controller start-up, we'll query the DB for all active launch plans and populate a map of active launch plans: `map[admin.NamedEntityIdentifier]admin.Schedule
Upon Concurrency Controller start-up, we'll query the DB for all active launch plans and populate a map of active launch plans: `map[admin.NamedEntityIdentifier]admin.SchedulerPolicy`

Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map.
Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map. If an execution has terminated since the last time the query ran, it won't be in the result set and we'll want to update the in memory map to remove the execution.


### Flyte Admin changes
### Execution Manager
Because we fetch the launch plan to reconcile execution inputs at CreateExecution time, we'll have the concurrency policy available to us at the time of execution creation.
If there is no concurrency policy defined, we'll proceed as [normal](https://github.com/flyteorg/flyte/blob/f14348165ccdfb26f8509c0f1ef380a360e59c4d/flyteadmin/pkg/manager/impl/execution_manager.go#L1169-L1173) and create the workflow execution CRD and then create a database entry for the execution with phase `UNKNOWN`. This way, we don't incur any penalty for executions

If there is a concurrency policy defined, we'll create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails` _but will not create a workflow CRD_
If there is a concurrency policy defined, if it's set to `ABORT` immediately fail the execution. Otherwise, we'll create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails` _but will not create a workflow CRD_


#### Database
Expand All @@ -145,9 +148,9 @@ We should consider adding an index to the executions table to include

##### Concurrency by specified launch plan versions
Executions are always tied to the versioned launch plan that triggered them (see [here](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go#L26))
Therefore, this proposal only applies concurrency at the launch plan Named Entity level, that is across (project, domain, version).
However, this proposal only applies concurrency at the launch plan Named Entity level, that is by (project, domain, name) and across all versions. The currently active launch plan version will determine the concurrency policy that gets applied for all executions created with the launch plan NamedEntity.

If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and update the keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier.
If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and add duplicates but with update keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier.
We could update usage like so
Expand All @@ -159,16 +162,16 @@ my_lp = LaunchPlan.get_or_create(
concurrency=Concurrency(
max=1, # defines how many executions with this launch plan can run in parallel
policy=ConcurrencyPolicy.WAIT # defines the policy to apply when the max concurrency is reached
precision=ConcurrencyPrecision.LAUNCH_PLAN
precision=ConcurrencyPrecision.LAUNCH_PLAN_VERSION
)
)
```
and by default, when the precision is omitted the SDK could register the launch plan using `ConcurrencyPrecision.LAUNCH_PLAN_VERSION`
and by default, when the precision is omitted the SDK could register the launch plan using `ConcurrencyPrecision.LAUNCH_PLAN`
We could update the concurrency protobuf definition like so:
```protobuf
message Concurrency {
message SchedulerPolicy {
// Defines how many executions with this launch plan can run in parallel
uint32 max = 1;
Expand Down

0 comments on commit fea29eb

Please sign in to comment.