- Getting Started
- Deploying Remote Shuffle Service Cluster
- Submitting a Flink Job
- Logging & Configuration
This page describes how to deploy remote shuffle service on Kubernetes. You can use the released image directly: docker.io/flinkremoteshuffle/flink-remote-shuffle:VERSION. Note that you need to replace the 'VERSION' filed with the actual version you want to use, for example, 1.0.0.
Kubernetes is a popular container-orchestration system for automating application deployment, scaling, and management. Remote shuffle service allows you to directly deploy the services on a running Kubernetes cluster.
The Getting Started
section assumes that your environment fulfills the following requirements:
-
A functional Kubernetes cluster (Kubernetes >= 1.13).
-
Make sure a valid Zookeeper cluster is ready. Or you can refer to setting up a Zookeeper cluster to start a Zookeeper cluster manually.
-
Download the latest binary release or build remote shuffle service yourself.
If you have problems setting up a Kubernetes cluster, take a look at how to setup a Kubernetes cluster.
The remote shuffle service cluster contains a ShuffleManager
and multiple ShuffleWorker
s. The ShuffleManager
runs as a Kubernetes Deployment (the number of replicas is 1), and the shuffle workers run as a Kubernetes DaemonSet which means the number of ShuffleWorker
s is the same as the number of machines in the Kubernetes cluster. The following two points should be noted here:
-
Currently, we only support host network for network communication.
-
The shuffle workers use a hostPath volume (specified by
remote-shuffle.kubernetes.worker.volume.host-paths
) or a emptydir volume (specified byremote-shuffle.kubernetes.worker.volume.empty-dirs
) for shuffle data storage.
Additionally, to make it easier to deploy on a Kubernetes cluster, we provided a Kubernetes Operator for remote shuffle service, which can control the life cycle of remote shuffle service instances, including creation, deletion, and upgrade.
Once you have your Kubernetes cluster ready and kubectl
is configured to point to it, you can launch an operator via:
# Note: You must configure the docker image to be used by modifying the template file first before running this command.
kubectl apply -f kubernetes-shuffle-operator-template.yaml
The template file kubernetes-shuffle-operator-template.yaml
should be in conf/
directory and its contents are as follows.
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: flink-rss-cr
rules:
- apiGroups: ["apiextensions.k8s.io"]
resources:
- customresourcedefinitions
verbs:
- '*'
- apiGroups: ["shuffleoperator.alibaba.com"]
resources:
- remoteshuffles
verbs:
- '*'
- apiGroups: ["shuffleoperator.alibaba.com"]
resources:
- remoteshuffles/status
verbs:
- update
- apiGroups: ["apps"]
resources:
- deployments
- daemonsets
verbs:
- '*'
- apiGroups: [""]
resources:
- configmaps
verbs:
- '*'
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: flink-rss-crb
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: flink-rss-cr
subjects:
- kind: ServiceAccount
name: flink-rss-sa
namespace: flink-system-rss
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink-rss-sa
namespace: flink-system-rss
---
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: flink-system-rss
name: flink-remote-shuffle-operator
spec:
replicas: 1
selector:
matchLabels:
app: flink-remote-shuffle-operator
template:
metadata:
labels:
app: flink-remote-shuffle-operator
spec:
serviceAccountName: flink-rss-sa
containers:
- name: flink-remote-shuffle-operator
image: <docker image> # You need to configure the docker image to be used here.
imagePullPolicy: Always
command:
- bash
args:
- -c
- $JAVA_HOME/bin/java -classpath '/flink-remote-shuffle/opt/*' -Dlog4j.configurationFile=file:/flink-remote-shuffle/conf/log4j2-operator.properties -Dlog.file=/flink-remote-shuffle/log/operator.log com.alibaba.flink.shuffle.kubernetes.operator.RemoteShuffleApplicationOperatorEntrypoint
Then you can start ShuffleManager
and ShuffleWorker
s via:
# Note: You must accomplish the template file first before running this command.
kubectl apply -f kubernetes-shuffle-cluster-template.yaml
The template file kubernetes-shuffle-cluster-template.yaml
should be in conf/
directory and its contents are as follows.
apiVersion: shuffleoperator.alibaba.com/v1
kind: RemoteShuffle
metadata:
namespace: flink-system-rss
name: flink-remote-shuffle
spec:
shuffleDynamicConfigs:
remote-shuffle.manager.jvm-opts: -verbose:gc -Xloggc:/flink-remote-shuffle/log/gc.log
remote-shuffle.worker.jvm-opts: -verbose:gc -Xloggc:/flink-remote-shuffle/log/gc.log
remote-shuffle.kubernetes.manager.cpu: 4
remote-shuffle.kubernetes.worker.cpu: 4
remote-shuffle.kubernetes.worker.limit-factor.cpu: 8
remote-shuffle.kubernetes.container.image: <docker image>
remote-shuffle.kubernetes.worker.volume.host-paths: name:disk,path:<dir on host>,mountPath:/data
remote-shuffle.storage.local-data-dirs: '[SSD]/data'
remote-shuffle.high-availability.mode: ZOOKEEPER
remote-shuffle.ha.zookeeper.quorum: <zookeeper quorum>
remote-shuffle.kubernetes.manager.env-vars: <env-vars> # You need to configure your time zone here, for example, TZ:Asia/Shanghai.
remote-shuffle.kubernetes.worker.env-vars: <env-vars> # You need to configure your time zone here, for example, TZ:Asia/Shanghai.
shuffleFileConfigs:
log4j2.properties: |
monitorInterval=30
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Log all info to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%t] %-60c %x - %m%n
# Log all info in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = true
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%t] %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=256MB
appender.rolling.policies.startup.type = OnStartupTriggeringPolicy
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
-
Note that
remote-shuffle.ha.zookeeper.quorum
should be accomplished according to the actual environment. -
Note that
remote-shuffle.kubernetes.container.image
should be accomplished according to the shuffle service image built from source code. -
Note that
remote-shuffle.kubernetes.worker.volume.host-paths
should be accomplished according to the actual storage path on host to be used. -
Note that
remote-shuffle.kubernetes.manager.env-vars
andremote-shuffle.kubernetes.worker.env-vars
should be accomplished to specify the right time zone.
If you want to build a new image by yourself, please refer to preparing docker environment and building from source.
After successfully running the above command kubectl apply -f XXX
, a new shuffle service cluster will be started.
To submit a Flink job, please refer to starting a Flink cluster and submitting a Flink job.
If you would like to run Flink jobs on Kubernetes, you need to follow the below steps:
- First of all, you need to build a new Flink docker image which contains remote shuffle plugin JAR file. Please refer to how to customize the Flink Docker image for more information. The following is a simple customized Flink Docker file example:
FROM flink
# The path of shuffle plugin JAR should be the lib/ directory of the remote shuffle distribution which need to be replaced by the really path in your environment.
COPY /<Path of the shuffle plugin JAR>/shuffle-plugin-<version>.jar /opt/flink/lib/
- The you should add the following configurations to
conf/flink-conf.yaml
in the extracted Flink directory to configure Flink to use the remote shuffle service. Please note that the configuration ofremote-shuffle.ha.zookeeper.quorum
should be exactly the same as that inkubernetes-shuffle-cluster-template.yaml
.
shuffle-service-factory.class: com.alibaba.flink.shuffle.plugin.RemoteShuffleServiceFactory
remote-shuffle.high-availability.mode: ZOOKEEPER
remote-shuffle.ha.zookeeper.quorum: zk1.host:2181,zk2.host:2181,zk3.host:2181
- Finally, you can start a Flink cluster on Kubernetes and submit a Flink job. Please refer to start a Flink cluster on Kubernetes or Flink natively on Kubernetes for more information.
From the above YAML file templates, you can figure out how to configure remote shuffle service on Kubernetes.
Kubernetes operator related options and log output file are specified in kubernetes-shuffle-operator-template.yaml
.
Any configurations in configuration page, log output format and log appender options of ShuffleManager
and ShuffleWorker
are configured in kubernetes-shuffle-cluster-template.yaml
.