For a quick introduction on how to build and install the Spark Operator, and how to run some example applications,
please refer to the Quick Start Guide. For a complete reference of the API definition of the
SparkApplication
and ScheduledSparkApplication
custom resources, please refer to the API Specification.
The Spark Operator ships with a command-line tool called sparkctl
that offers additional features beyond what kubectl
is able to do. Documentation on sparkctl
can be found in README. If you are running the Spark Operator on Google Kubernetes Engine and want to use Google Cloud Storage (GCS) and/or BigQuery for reading/writing data, also refer to the GCP guide.
- Using a SparkApplication
- Writing a SparkApplication Spec
- Working with SparkApplications
- Running Spark Applications on a Schedule using a ScheduledSparkApplication
The Spark Operator runs Spark applications specified in Kubernetes objects of the SparkApplication
custom resource type.
The most common way of using a SparkApplication
is store the SparkApplication
specification in a YAML file and use
the kubectl
command or alternatively the sparkctl
command to work with the SparkApplication
. The Spark Operator
automatically submits the application as configured in a SparkApplication
to run on the Kubernetes cluster and uses
the SparkApplication
to collect and surface the status of the driver and executors to the user.
As with all other Kubernetes API objects, a SparkApplication
needs the apiVersion
, kind
, and metadata
fields.
For general information about working with manifests, see
object management using kubectl.
A SparkApplication
also needs a
.spec
section. This section
contains fields for specifying various aspects of an application including its type (Scala
, Java
, Python
, or R
),
deployment mode (clsuter
or client
), main application resource URI (e.g., the URI of the application jar),
main class, arguments, etc. Node selectors are also supported via the optional field .spec.nodeSelector
.
It also has fields for specifying the unified container image (to use for both the driver and executors) and the image
pull policy, namely, .spec.image
and .spec.imagePullPolicy
respectively. If a custom init-container (in both the
driver and executor pods) image needs to be used, the optional field .spec.initContainerImage
can be used to specify
it. If set, .spec.initContainerImage
overrides .spec.image
for the init-container image. Otherwise, the image
specified by .spec.image
will be used for the init-container. It is invalid if both .spec.image
and
.spec.initContainerImage
are not set.
Below is an example showing part of a SparkApplication
specification:
apiVersion: sparkoperator.k8s.io/v1alpha1
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
type: Scala
mode: cluster
image: gcr.io/spark/spark:v2.3.0
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
Often Spark applications need additional files additionally to the main application resource to run. Such application
dependencies can include for example jars and data files the application needs at runtime. When using the spark-submit
script to submit a Spark application, such dependencies are specified using the --jars
and --files
options.
To support specification of application dependenies, a SparkApplication
uses an optional field .spec.deps
that in
turn supports specifying jars and files, respectively. More specifically, the optional fields .spec.deps.jars
and
.spec.deps.files
correspond to the --jars
and --files
options of the spark-submit
script, respectively.
Additionally, .spec.deps
also has fields for specifying the locations in the driver and executor containers where jars
and files should be downloaded to, namely, .spec.deps.jarsDownloadDir
and .spec.deps.filesDownloadDir
. The optional
fields .spec.deps.downloadTimeout
and .spec.deps.maxSimultaneousDownloads
are used to control the timeout and
maximum parallelism of downloading dependencies that are hosted remotely, e.g., on an HTTP server, or in external
storage such as HDFS, Google Cloud Storage, or AWS S3.
The following is an example specification with both container-local (i.e., within the container) and remote dependencies:
spec:
deps:
jars:
- local:///opt/spark-jars/gcs-connector.jar
files:
- gs://spark-data/data-file-1.txt
- gs://spark-data/data-file-2.txt
There are two ways to add Spark configuration: setting individual Spark configuration properties using the optional
field .spec.sparkConf
or mounting a special Kubernetes ConfigMap storing Spark configuration files, e.g.,
spark-defaults.conf
, spark-env.sh
, log4j.properties
, etc. using the optional field .spec.sparkConfigMap
. If
.spec.sparkConfigMap
is used, additionally to mounting the ConfigMap into the driver and executors, the Spark Operator
additionally sets the environment variable SPARK_CONF_DIR
to point to the mount path of the ConfigMap.
spec:
sparkConf:
"spark.ui.port": 4045
"spark.eventLog.enabled": true
"spark.eventLog.dir": hdfs://hdfs-namenode-1:8020/spark/spark-events
There are two ways to add Hadoop configuration: setting individual Hadoop configuration properties using the optional
field .spec.hadoopConf
or mounting a special Kubernetes ConfigMap storing Hadoop configuration files, e.g.,
core-site.xml
, using the optional field .spec.hadoopConfigMap
. The Spark Operator automatically adds the prefix
spark.hadoop.
to the names of individual Hadoop configuration properties in .spec.hadoopConf
. If
.spec.hadoopConfigMap
is used, additionally to mounting the ConfigMap into the driver and executors, the Spark
Operator additionally sets the environment variable HADOOP_CONF_DIR
to point to the mount path of the ConfigMap.
The following is an example showing the use of individual Hadoop configuration properties:
spec:
hadoopConf:
"fs.gs.project.id": spark
"fs.gs.system.bucket": spark
"google.cloud.auth.service.account.enable": true
"google.cloud.auth.service.account.json.keyfile": /mnt/secrets/key.json
The .spec
section of a SparkApplication
has a .spec.driver
field for configuring the driver. It allows users to
set the memory and cpu resources to request for the driver pod, and the container image the driver should use. It also
has fields for optionally specifying labels, annotations, and environment variables for the driver pod. By default, the
driver pod name of an application is automatically generated by the Spark submission client. If instead you want to use
a particular name for the driver pod, the optional field .spec.driver.podName
can be used. The driver pod by default
uses the default
service account in the namespace it is running in to talk to the Kubernetes API server. The default
service account, however, may or may not have sufficient permissions to create executor pods and the headless service
used by the executors to connect to the driver. If it does not and a custom service account that has the right
permissions should be used instead, the optional field .spec.driver.serviceAccount
can be used to specify the name of
the custom service account. When a custom container image is needed for the driver, the field .spec.driver.image
can
be used to specify it. This overrides the image specified in .spec.image
if it is also set. It is invalid if both
.spec.image
and .spec.driver.image
are not set.
For applications that need
to mount Kubernetes Secrets or
ConfigMaps into the driver pod,
fields .spec.driver.secrets
and .spec.driver.configMaps
can be used. For more details, please refer to
Mounting Secrets and Mounting ConfigMaps.
The following is an example driver specification:
spec:
driver:
cores: 0.1
coreLimit: 200m
memory: 512m
labels:
version: 2.3.0
serviceAccount: spark
The .spec
section of a SparkApplication
has a .spec.executor
field for configuring the executors. It allows users
to set the memory and cpu resources to request for the executor pods, and the container image the executors should use.
It also has fields for optionally specifying labels, annotations, and environment variables for the executor pods. By
default, a single executor is requested for an application. If more than one executor are needed, the optional field
.spec.executor.instances
can be used to specify the number of executors to request. When a custom container image is
needed for the executors, the field .spec.executor.image
can be used to specify it. This overrides the image specified
in .spec.image
if it is also set. It is invalid if both .spec.image
and .spec.executor.image
are not set.
For applications that need
to mount Kubernetes Secrets or
ConfigMaps into the executor pods,
fields .spec.executor.secrets
and .spec.executor.configMaps
can be used. For more details, please refer to
Mounting Secrets and Mounting ConfigMaps.
An example executor specification is shown below:
spec:
executor:
cores: 1
instances: 1
memory: 512m
labels:
version: 2.3.0
As mentioned above, both the driver specification and executor specification have an optional field secrets
for configuring
the list of Kubernetes Secrets to be mounted into the driver and executors, respectively. The field is a map with the
names of the Secrets as keys and values specifying the mount path and type of each Secret. For instance, the following
example shows a driver specification with a Secret named gcp-svc-account
of type GCPServiceAccount
to be mounted to
/mnt/secrets
in the driver pod.
spec:
driver:
secrets:
- name: gcp-svc-account
path: /mnt/secrets
secretType: GCPServiceAccount
The type of a Secret as specified by the secretType
field is a hint to the Spark Operator on what extra configuration
it needs to take care of for the specific type of Secrets. For example, if a Secret is of type GCPServiceAccount
, the
Spark Operator additionally sets the environment variable GOOGLE_APPLICATION_CREDENTIALS
to point to the JSON key file
stored in the secret. Please refer to
Getting Started with Authentication for more information
on how to authenticate with GCP services using a service account JSON key file. Note that the Spark Operator assumes
that the key of the service account JSON key file in the Secret data map is key.json
so it is able to set the
environment variable automatically. Similarly, if the type of a Secret is HadoopDelegationToken
, the Spark Operator
additionally sets the environment variable HADOOP_TOKEN_FILE_LOCATION
to point to the file storing the Hadoop
delegation token. In this case, the Spark Operator assumes that the key of the delegation token file in the Secret data
map is hadoop.token
.
Both the driver specification and executor specifications have an optional field for configuring
the list of Kubernetes ConfigMaps to be mounted into the driver and executors, respectively. The field is a map with
keys being the names of the ConfigMaps and values specifying the mount path of each ConfigMap. For instance, the
following example shows a driver specification with a ConfigMap named configmap1
to be mounted to
/mnt/config-maps
in the driver pod.
spec:
driver:
configMaps:
- name: configmap1
path: /mnt/config-maps
Note that the initializer needs to be enabled to use this feature. Please refer to the Quick Start Guide on how to enable the initializer.
A SparkApplication
can specify a Kubernetes ConfigMap storing Spark configuration files such as spark-env.sh
or
spark-defaults.conf
using the optional field .spec.sparkConfigMap
whose value is the name of the ConfigMap. The
ConfigMap is assumed to be in the same namespace as that of the SparkApplication
. The Spark Operator mounts the
ConfigMap onto path /etc/spark/conf
in both the driver and executors. Additionally, it also sets the environment
variable SPARK_CONF_DIR
to point to /etc/spark/conf
in the driver and executors.
Note that the initializer needs to be enabled to use this feature. Please refer to the Quick Start Guide on how to enable the initializer.
A SparkApplication
can specify a Kubernetes ConfigMap storing Hadoop configuration files such as core-site.xml
using the optional field .spec.hadoopConfigMap
whose value is the name of the ConfigMap. The ConfigMap is assumed to
be in the same namespace as that of the SparkApplication
. The Spark Operator mounts the ConfigMap onto path
/etc/hadoop/conf
in both the driver and executors. Additionally, it also sets the environment
variable HADOOP_CONF_DIR
to point to /etc/hadoop/conf
in the driver and executors.
Note that the initializer needs to be enabled to use this feature. Please refer to the Quick Start Guide on how to enable the initializer.
The Spark Operator also supports mounting user-specified Kubernetes volumes into the driver and executors. A
SparkApplication
has an optional field .spec.volumes
for specifying the list of
volumes the driver and the
executors need collectively. Then both the driver and executor specifications have an optional field volumeMounts
that specifies the volume mounts
for the volumes needed by the driver and executors, respectively. The following is an
example showing a SparkApplication
with both driver and executor volume mounts.
spec:
volumes:
- name: spark-data
persistentVolumeClaim:
claimName: my-pvc
- name: spark-work
emptyDir: {}
driver:
volumeMounts:
- name: spark-work
mountPath: /mnt/spark/work
executor:
volumeMounts:
- name: spark-data
mountPath: /mnt/spark/data
- name: spark-work
mountPath: /mnt/spark/work
Note that the initializer needs to be enabled to use this feature. Please refer to the Quick Start Guide on how to enable the initializer.
A SparkApplication
can be created from a YAML file storing the SparkApplication
specification using either the
kubectl apply -f <YAML file path>
command or the sparkctl create <YAML file path>
command. Please refer to the
sparkctl
README for usage of the sparkctl create
command. Once a SparkApplication
is successfully created, the Spark Operator will receive it and submits the application as configured in the
specification to run on the Kubernetes cluster.
A SparkApplication
can be deleted using either the kubectl delete <name>
command or the sparkctl delete <name>
command. Please refer to the sparkctl
README for usage of the sparkctl delete
command. Deleting a SparkApplication
deletes the Spark application associated with it. If the application is running
when the deletion happens, the application is killed and all Kubernetes resources associated with the application are
deleted or garbage collected.
A SparkApplication
can be updated using the kubectl apply -f <updated YAML file>
command. When a SparkApplication
is successfully updated, the Spark Operator will receive both the updated and old SparkApplication
objects. If the
specification of the SparkApplication
has changed, the Spark Operator submits the application to run, using the
updated specification. If the application is currently running, the Spark Operator kills the running application before
submitting a new run with the updated specification. There is planned work to enhance the way SparkApplication
updates
are handled. For example, if the change was to increase the number of executor instances, instead of killing the
currently running application and starting a new run, it is a much better user experience to incrementally launch the
additional executor pods.
A SparkApplication
can be checked using the kubectl describe sparkapplications <name>
command. The output of the
command shows the specification and status of the SparkApplication
as well as events associated with it. The events
communicate the overall process and errors of the SparkApplication
.
The Spark Operator supports automatic application restart with a configurable RestartPolicy
using the optional field
.spec.restartPolicy
, whose valid values include Never
, OnFailure
, and Always
. Upon termination of an application,
the Spark Operator determines if the application is subject to restart based on its termination state and the
RestartPolicy
in the specification. If the application is subject to restart, the Spark Operator restarts it by
submitting a new run of it. The old driver pod is deleted if it still exists before submitting the new run, and a new
driver pod is created by the submission client so effectively the driver gets restarted.
The Spark Operator supports automatically retrying failed submissions. When the Spark Operator failed to submit an
application, it determines if the application is subject to a submission retry based on if the optional field
.spec.maxSubmissionRetries
is set and has a positive value and the number of times it has already retried. If the
maximum submission retries has not been reached, the Spark Operator retries submitting the application using a linear
backoff with the interval specified by .spec.submissionRetryInterval
. If .spec.submissionRetryInterval
is not set,
the Spark Operator retries submitting the application immediately.
The Spark Operator supports running a Spark application on a standard cron
schedule using objects of the ScheduledSparkApplication
custom resource type. A ScheduledSparkApplication
object
specifies a cron schedule on which the application should run and a SparkApplication
template from which
a SparkApplication
object for each run of the application is created. The following is an example
ScheduledSparkApplication
:
apiVersion: "sparkoperator.k8s.io/v1alpha1"
kind: ScheduledSparkApplication
metadata:
name: spark-pi-scheduled
namespace: default
spec:
schedule: "@every 5m"
concurrencyPolicy: Allow
successfulRunHistoryLimit: 1
failedRunHistoryLimit: 3
template:
type: Scala
mode: cluster
image: gcr.io/spark/spark:v2.3.0
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
driver:
cores: 0.5
memory: 512m
executor:
cores: 1
instances: 1
memory: 512m
restartPolicy: Never
The concurrency of runs of an application is controlled by .spec.concurrencyPolicy
, whose valid values are Allow
,
Forbid
, and Replace
, with Allow
being the default. The meanings of each value is described below:
Allow
: more than one run of an application are allowed if for example the next run of the application is due even though the previous run has not completed yet.Forbid
: no more than one run of an application is allowed. The next run of the application can only start if the previous run has completed.Replace
: no more than one run of an application is allowed. When the next run of the application is due, the previous run is killed and the next run starts as a replacement.
A scheduled ScheduledSparkApplication
can be temporarily suspended (no future scheduled runs of the application will
be triggered) by setting .spec.suspend
to true
. The schedule can be resumed by removing .spec.suspend
or setting
it to false
. A ScheduledSparkApplication
can have names of SparkApplication
objects for the past runs of the
application tracked in the Status
section as discussed below. The numbers of past successful runs and past failed runs
to keep track of are controlled by field .spec.successfulRunHistoryLimit
and field .spec.failedRunHistoryLimit
,
respectively. The example above allows 1 past successful run and 3 past failed runs to be tracked.
The Status
section of a ScheduledSparkApplication
object shows the time of the last run and the proposed time of the
next run of the application, through .status.lastRun
and .status.nextRun
, respectively. The names of the
SparkApplication
object for the most recent run (which may or may not be running) of the application are stored in
.status.lastRunName
. The names of SparkApplication
objects of the past successful runs of the application are stored
in .status.pastSuccessfulRunNames
. Similarly, the names of SparkApplication
objects of the past failed runs of
the application are stored in .status.pastFailedRunNames
.
Note that certain restart policies (specified in .spec.template.restartPolicy
) may not work well with the specified
schedule and concurrency policy of a ScheduledSparkApplication
. For example, a restart policy of Always
should never
be used with a ScheduledSparkApplication
. In most cases, a restart policy of OnFailure
may not be a good choice as
the next run usually picks up where the previous run left anyway. For these reasons, it's often the right choice to use
a restart policy of Never
as the example above shows.