Skip to content

Commit

Permalink
CoarseGrainedExecutorBackend
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Dec 28, 2023
1 parent 170c927 commit 1db7f3d
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 60 deletions.
106 changes: 51 additions & 55 deletions docs/executor/CoarseGrainedExecutorBackend.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# CoarseGrainedExecutorBackend

`CoarseGrainedExecutorBackend` is an [ExecutorBackend](ExecutorBackend.md) and an [IsolatedRpcEndpoint](../rpc/RpcEndpoint.md#IsolatedRpcEndpoint).
`CoarseGrainedExecutorBackend` is an [ExecutorBackend](ExecutorBackend.md) that controls the lifecycle of a single [executor](#executor) and sends [executor status updates](#statusUpdate) to the [driver](#driver).

![CoarseGrainedExecutorBackend Sending Task Status Updates to Driver's CoarseGrainedScheduler Endpoint](../images/executor/CoarseGrainedExecutorBackend-statusUpdate.png)

`CoarseGrainedExecutorBackend` is started in a resource container (as a [standalone application](#main)).

## Creating Instance

Expand All @@ -12,48 +16,76 @@
* <span id="bindAddress"> Bind Address (_unused_)
* <span id="hostname"> Hostname
* <span id="cores"> Number of CPU cores
* <span id="userClassPath"> User Classpath (`Seq[URL]`)
* <span id="env"> [SparkEnv](../SparkEnv.md)
* <span id="resourcesFileOpt"> Resources Configuration File
* <span id="resourceProfile"> [ResourceProfile](../stage-level-scheduling/ResourceProfile.md)

!!! note
[driverUrl](#driverUrl), [executorId](#executorId), [hostname](#hostname), [cores](#cores) and [userClassPath](#userClassPath) correspond to `CoarseGrainedExecutorBackend` standalone application's [command-line arguments](#command-line-arguments).

`CoarseGrainedExecutorBackend` is created when:
`CoarseGrainedExecutorBackend` is created upon launching [CoarseGrainedExecutorBackend](#run) standalone application.

* `CoarseGrainedExecutorBackend` standalone application is [launched](#run)
## Executor { #executor }

## <span id="decommissionSelf"> decommissionSelf
`CoarseGrainedExecutorBackend` manages the lifecycle of a single [Executor](Executor.md):

```scala
decommissionSelf(): Unit
```
* An `Executor` is [created](Executor.md#creating-instance) upon [receiving a RegisteredExecutor message](#RegisteredExecutor)
* [Stopped](Executor.md#stop) upon [receiving a Shutdown message](#Shutdown) (that happens on a separate `CoarseGrainedExecutorBackend-stop-executor` thread)

The `Executor` is used for the following:

* [decommissionSelf](#decommissionSelf)
* [Launching a task](Executor.md#launchTask) (upon [receiving a LaunchTask message](#LaunchTask))
* [Killing a task](Executor.md#killTask) (upon [receiving a KillTask message](#KillTask))
* Reporting the number of CPU cores used for a given task in [statusUpdate](#statusUpdate)

## Reporting Task Status { #statusUpdate }

`decommissionSelf`...FIXME
??? note "ExecutorBackend"

`decommissionSelf` is used when:
```scala
statusUpdate(
taskId: Long,
state: TaskState,
data: ByteBuffer): Unit
```

* `CoarseGrainedExecutorBackend` is requested to [handle a DecommissionExecutor message](#DecommissionExecutor)
`statusUpdate` is part of the [ExecutorBackend](ExecutorBackend.md#statusUpdate) abstraction.

`statusUpdate`...FIXME

<!---
`statusUpdate` creates a [StatusUpdate](../scheduler/DriverEndpoint.md#StatusUpdate) (with the input `taskId`, `state`, and `data` together with the <<executorId, executor id>>) and sends it to the <<driver, driver>> (if connected already).
.CoarseGrainedExecutorBackend Sending Task Status Updates to Driver's CoarseGrainedScheduler Endpoint
image::CoarseGrainedExecutorBackend-statusUpdate.png[align="center"]
-->

## Messages

### <span id="DecommissionExecutor"> DecommissionExecutor
### DecommissionExecutor { #DecommissionExecutor }

`DecommissionExecutor` is sent out when `CoarseGrainedSchedulerBackend` is requested to [decommissionExecutors](../scheduler/CoarseGrainedSchedulerBackend.md#decommissionExecutors)

When received, `CoarseGrainedExecutorBackend` [decommissionSelf](#decommissionSelf).

## Review Me
## Logging

CoarseGrainedExecutorBackend is an executor:ExecutorBackend.md[] that controls the lifecycle of a single <<executor, executor>> and sends <<statusUpdate, the executor's status updates>> to the driver.
Enable `ALL` logging level for `org.apache.spark.executor.CoarseGrainedExecutorBackend` logger to see what happens inside.

.CoarseGrainedExecutorBackend Sending Task Status Updates to Driver's CoarseGrainedScheduler Endpoint
image::CoarseGrainedExecutorBackend-statusUpdate.png[align="center"]
Add the following line to `conf/log4j2.properties`:

CoarseGrainedExecutorBackend is a rpc:RpcEndpoint.md#ThreadSafeRpcEndpoint[ThreadSafeRpcEndpoint] that <<onStart, connects to the driver>> (before accepting <<messages, messages>>) and <<onDisconnected, shuts down when the driver disconnects>>.
```text
logger.CoarseGrainedExecutorBackend.name = org.apache.spark.executor.CoarseGrainedExecutorBackend
logger.CoarseGrainedExecutorBackend.level = all
```

Refer to [Logging](../spark-logging.md).

<!---
## Review Me
CoarseGrainedExecutorBackend is started in a resource container (as a <<main, standalone application>>).
CoarseGrainedExecutorBackend is a rpc:RpcEndpoint.md#ThreadSafeRpcEndpoint[ThreadSafeRpcEndpoint] that <<onStart, connects to the driver>> (before accepting <<messages, messages>>) and <<onDisconnected, shuts down when the driver disconnects>>.
When <<run, started>>, CoarseGrainedExecutorBackend <<creating-instance, registers the Executor RPC endpoint>> to communicate with the driver (with [DriverEndpoint](../scheduler/DriverEndpoint.md)).
Expand Down Expand Up @@ -115,26 +147,6 @@ Received LaunchTask command but executor was null
NOTE: `LaunchTask` is sent when `CoarseGrainedSchedulerBackend` is requested to [launch tasks](../scheduler/DriverEndpoint.md#launchTasks) (one `LaunchTask` per task).
== [[statusUpdate]] Sending Task Status Updates to Driver -- `statusUpdate` Method

[source, scala]
----
statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit
----

NOTE: `statusUpdate` is part of executor:ExecutorBackend.md#statusUpdate[ExecutorBackend Contract] to send task status updates to a scheduler (on the driver).

`statusUpdate` creates a [StatusUpdate](../scheduler/DriverEndpoint.md#StatusUpdate) (with the input `taskId`, `state`, and `data` together with the <<executorId, executor id>>) and sends it to the <<driver, driver>> (if connected already).

.CoarseGrainedExecutorBackend Sending Task Status Updates to Driver's CoarseGrainedScheduler Endpoint
image::CoarseGrainedExecutorBackend-statusUpdate.png[align="center"]

When no <<driver, driver>> is available, you should see the following WARN message in the logs:

```
WARN Drop [msg] because has not yet connected to driver
```

== [[driverURL]] Driver's URL
The driver's URL is of the format `spark://[RpcEndpoint name]@[hostname]:[port]`, e.g. `spark://[email protected]:64859`.
Expand Down Expand Up @@ -443,19 +455,6 @@ CAUTION: FIXME
CAUTION: FIXME
== [[logging]] Logging

Enable `ALL` logging level for `org.apache.spark.executor.CoarseGrainedExecutorBackend` logger to see what happens inside.

Add the following line to `conf/log4j.properties`:

[source,plaintext]
----
log4j.logger.org.apache.spark.executor.CoarseGrainedExecutorBackend=ALL
----

Refer to spark-logging.md[Logging].

== [[internal-properties]] Internal Properties
=== [[ser]] SerializerInstance
Expand All @@ -466,10 +465,6 @@ Initialized when <<creating-instance, CoarseGrainedExecutorBackend is created>>.
NOTE: CoarseGrainedExecutorBackend uses the input `env` to core:SparkEnv.md#closureSerializer[access `closureSerializer`].
=== [[driver]] Driver RpcEndpointRef

rpc:RpcEndpointRef.md[RpcEndpointRef] of the driver

=== [[stopping]] stopping Flag
Enabled when CoarseGrainedExecutorBackend gets notified to <<StopExecutor, stop itself>> or <<Shutdown, shut down the managed executor>>.
Expand All @@ -483,3 +478,4 @@ Used when CoarseGrainedExecutorBackend RPC Endpoint gets notified that <<onDisco
Single managed coarse-grained executor:Executor.md#coarse-grained-executor[Executor] managed exclusively by the CoarseGrainedExecutorBackend to forward <<LaunchTask, launch>> and <<KillTask, kill>> task requests to from the driver.
Initialized after CoarseGrainedExecutorBackend <<RegisteredExecutor, has registered with `CoarseGrainedSchedulerBackend`>> and stopped when CoarseGrainedExecutorBackend gets requested to <<Shutdown, shut down>>.
-->
10 changes: 7 additions & 3 deletions docs/executor/ExecutorBackend.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# ExecutorBackend

`ExecutorBackend` is an [abstraction](#contract) of [executor backends](#implementations) (that [TaskRunner](TaskRunner.md)s use to [send task status updates](#statusUpdate) to a scheduler).
`ExecutorBackend` is an [abstraction](#contract) of [executor backends](#implementations) (that [TaskRunner](TaskRunner.md)s use to [report task status updates](#statusUpdate) to a scheduler).

![ExecutorBackend receives notifications from TaskRunners](../images/executor/ExecutorBackend.png)

`ExecutorBackend` acts as a bridge between executors and the driver.

## Contract

### <span id="statusUpdate"> statusUpdate
### Reporting Task Status { #statusUpdate }

```scala
statusUpdate(
Expand All @@ -17,7 +17,11 @@ statusUpdate(
data: ByteBuffer): Unit
```

Sending a status update to a scheduler
Reports task status of the given task to a scheduler

See:

* [CoarseGrainedExecutorBackend](CoarseGrainedExecutorBackend.md#statusUpdate)

Used when:

Expand Down
2 changes: 1 addition & 1 deletion docs/shuffle/ExternalSorter.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

* <span id="context"> [TaskContext](../scheduler/TaskContext.md)
* <span id="aggregator"> Optional [Aggregator](../rdd/Aggregator.md) (default: undefined)
* <span id="partitioner"> Optional [Partitioner](../rdd/Partitioner) (default: undefined)
* <span id="partitioner"> Optional [Partitioner](../rdd/Partitioner.md) (default: undefined)
* <span id="ordering"> Optional `Ordering` ([Scala]({{ scala.api }}/scala/math/Ordering.html)) for keys (default: undefined)
* <span id="serializer"> [Serializer](../serializer/Serializer.md) (default: [Serializer](../SparkEnv.md#serializer))

Expand Down
2 changes: 1 addition & 1 deletion docs/storage/DiskStore.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
blockSizes: ConcurrentHashMap[BlockId, Long]
```

`DiskStore` uses `ConcurrentHashMap` ([Java]({{ java.api }}/java/util/concurrent/ConcurrentHashMap.html)) as a registry of [block](BlockId)s and the data size (on disk).
`DiskStore` uses `ConcurrentHashMap` ([Java]({{ java.api }}/java/util/concurrent/ConcurrentHashMap.html)) as a registry of [block](BlockId.md)s and the data size (on disk).

A new entry is added when [put](#put) and [moveFileToBlock](#moveFileToBlock).

Expand Down

0 comments on commit 1db7f3d

Please sign in to comment.