diff --git a/README.md b/README.md index 710c4af3616b1..6134b85e419e5 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/) * A streaming-first runtime that supports both batch processing and data streaming programs -* Elegant and fluent APIs in Java and Scala +* Elegant and fluent APIs in Java * A runtime that supports very high throughput and low event latency at the same time @@ -23,8 +23,6 @@ Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/) * Libraries for Graph processing (batch), Machine Learning (batch), and Complex Event Processing (streaming) -* Built-in support for iterative programs (BSP) in the DataSet (batch) API - * Custom memory management for efficient and robust switching between in-memory and out-of-core data processing algorithms * Compatibility layers for Apache Hadoop MapReduce @@ -33,32 +31,68 @@ Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/) ### Streaming Example -```scala -case class WordWithCount(word: String, count: Long) - -val text = env.socketTextStream(host, port, '\n') - -val windowCounts = text.flatMap { w => w.split("\\s") } - .map { w => WordWithCount(w, 1) } - .keyBy("word") - .window(TumblingProcessingTimeWindow.of(Duration.ofSeconds(5))) - .sum("count") - -windowCounts.print() +```java +// pojo class WordWithCount +public class WordWithCount { + public String word; + public int count; + + public WordWithCount() {} + + public WordWithCount(String word, int count) { + this.word = word; + this.count = count; + } +} + +// main method +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +DataStreamSource text = env.socketTextStream(host, port); +DataStream windowCounts = text + .flatMap( + (FlatMapFunction) (line, collector) + -> Arrays.stream(line.split("\\s")).forEach(collector::collect) + ).returns(String.class) + .map(word -> new WordWithCount(word, 1)).returns(TypeInformation.of(WordWithCount.class)) + .keyBy(wordWithCnt -> wordWithCnt.word) + .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5))) + .sum("count").returns(TypeInformation.of(WordWithCount.class)); + +windowCounts.print(); +env.execute(); +} ``` ### Batch Example -```scala -case class WordWithCount(word: String, count: Long) - -val text = env.readTextFile(path) - -val counts = text.flatMap { w => w.split("\\s") } - .map { w => WordWithCount(w, 1) } - .groupBy("word") - .sum("count") - -counts.writeAsCsv(outputPath) +```java +// pojo class WordWithCount +public class WordWithCount { + public String word; + public int count; + + public WordWithCount() {} + + public WordWithCount(String word, int count) { + this.word = word; + this.count = count; + } +} + +// main method +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setRuntimeMode(RuntimeExecutionMode.BATCH); +FileSource source = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("MyInput.txt")).build(); +DataStreamSource text = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySource"); +DataStream windowCounts = text + .flatMap((FlatMapFunction) (line, collector) -> Arrays + .stream(line.split("\\s")) + .forEach(collector::collect)).returns(String.class) + .map(word -> new WordWithCount(word, 1)).returns(TypeInformation.of(WordWithCount.class)) + .keyBy(wordWintCount -> wordWintCount.word) + .sum("count").returns(TypeInformation.of(WordWithCount.class)); + +windowCounts.print(); +env.execute(); ``` diff --git a/docs/README.md b/docs/README.md index 8ea0de0ecd5ef..ef118c0e96aba 100644 --- a/docs/README.md +++ b/docs/README.md @@ -124,14 +124,14 @@ to its documentation markdown. The following are available for use: #### Flink Artifact - {{< artifact flink-streaming-scala withScalaVersion >}} + {{< artifact flink-table-api-scala withScalaVersion >}} -This will be replaced by the maven artifact for flink-streaming-scala that users should copy into their pom.xml file. It will render out to: +This will be replaced by the maven artifact for flink-table-api-scala that users should copy into their pom.xml file. It will render out to: ```xml org.apache.flink - flink-streaming-scala_2.12 + flink-table-api-scala_2.12 ``` diff --git a/docs/config.toml b/docs/config.toml index c9a623c6a4f52..85952c3f490f9 100644 --- a/docs/config.toml +++ b/docs/config.toml @@ -62,8 +62,6 @@ pygmentsUseClasses = true JavaDocs = "//nightlies.apache.org/flink/flink-docs-master/api/java/" - ScalaDocs = "//nightlies.apache.org/flink/flink-docs-master/api/scala/index.html#org.apache.flink.api.scala.package" - PyDocs = "//nightlies.apache.org/flink/flink-docs-master/api/python/" # External links at the bottom @@ -71,7 +69,6 @@ pygmentsUseClasses = true MenuLinks = [ ["Project Homepage", "//flink.apache.org"], ["JavaDocs", "//nightlies.apache.org/flink/flink-docs-master/api/java/"], - ["ScalaDocs", "//nightlies.apache.org/flink/flink-docs-master/api/scala/index.html#org.apache.flink.api.scala.package"], ["PyDocs", "//nightlies.apache.org/flink/flink-docs-master/api/python/"] ] diff --git a/docs/content.zh/docs/concepts/flink-architecture.md b/docs/content.zh/docs/concepts/flink-architecture.md index d0cd8cfcb60b2..e57b06fcd1308 100644 --- a/docs/content.zh/docs/concepts/flink-architecture.md +++ b/docs/content.zh/docs/concepts/flink-architecture.md @@ -97,7 +97,7 @@ _JobManager_ 具有许多与协调 Flink 应用程序的分布式执行有关的 _Flink 应用程序_ 是从其 ``main()`` 方法产生的一个或多个 Flink 作业的任何用户程序。这些作业的执行可以在本地 JVM(`LocalEnvironment`)中进行,或具有多台机器的集群的远程设置(``RemoteEnvironment``)中进行。对于每个程序,`ExecutionEnvironment` 提供了一些方法来控制作业执行(例如设置并行度)并与外界交互(请参考 [Flink 程序剖析]({{< ref "docs/dev/datastream/overview" >}}#anatomy-of-a-flink-program) )。 -Flink 应用程序的作业可以被提交到长期运行的 [Flink Session 集群]({{< ref "docs/concepts/glossary" >}}#flink-session-cluster)、专用的 [Flink Job 集群]({{< ref "docs/concepts/glossary" >}}#flink-job-cluster) 或 [Flink Application 集群]({{< ref "docs/concepts/glossary" >}}#flink-application-cluster)。这些选项之间的差异主要与集群的生命周期和资源隔离保证有关。 +Flink 应用程序的作业可以被提交到长期运行的 [Flink Session 集群]({{< ref "docs/concepts/glossary" >}}#flink-session-cluster) 或 [Flink Application 集群]({{< ref "docs/concepts/glossary" >}}#flink-application-cluster)。这些选项之间的差异主要与集群的生命周期和资源隔离保证有关。 ### Flink Session 集群 @@ -111,21 +111,6 @@ Flink 应用程序的作业可以被提交到长期运行的 [Flink Session 集 以前,Flink Session 集群也被称为 session 模式下的 Flink 集群。 {{< /hint >}} -### Flink Job 集群 - -* **集群生命周期**:在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。在这里,客户端首先从集群管理器请求资源启动 JobManager,然后将作业提交给在这个进程中运行的 Dispatcher。然后根据作业的资源请求惰性的分配 TaskManager。一旦作业完成,Flink Job 集群将被拆除。 - -* **资源隔离**:JobManager 中的致命错误仅影响在 Flink Job 集群中运行的一个作业。 - -* **其他注意事项**:由于 ResourceManager 必须应用并等待外部资源管理组件来启动 TaskManager 进程和分配资源,因此 Flink Job 集群更适合长期运行、具有高稳定性要求且对较长的启动时间不敏感的大型作业。 - -{{< hint info >}} -以前,Flink Job 集群也被称为 job (or per-job) 模式下的 Flink 集群。 -{{< /hint >}} -{{< hint info >}} -Kubernetes 不支持 Flink Job 集群。 请参考 [Standalone Kubernetes]({{< ref "docs/deployment/resource-providers/standalone/kubernetes" >}}#per-job-cluster-mode) 和 [Native Kubernetes]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}}#per-job-cluster-mode)。 -{{< /hint >}} - ### Flink Application 集群 * **集群生命周期**:Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且 ``main()``方法在集群上而不是客户端上运行。提交作业是一个单步骤过程:无需先启动 Flink 集群,然后将作业提交到现有的 session 集群;相反,将应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(``ApplicationClusterEntryPoint``)负责调用 ``main()``方法来提取 JobGraph。例如,这允许你像在 Kubernetes 上部署任何其他应用程序一样部署 Flink 应用程序。因此,Flink Application 集群的寿命与 Flink 应用程序的寿命有关。 diff --git a/docs/content.zh/docs/deployment/cli.md b/docs/content.zh/docs/deployment/cli.md index 88b26b641985b..8d2f079af7130 100644 --- a/docs/content.zh/docs/deployment/cli.md +++ b/docs/content.zh/docs/deployment/cli.md @@ -76,14 +76,10 @@ stored in a variable `JOB_ID` for the commands below: $ export JOB_ID="cca7bc1061d61cf15238e92312c2fc20" ``` -There is another action called `run-application` available to run the job in -[Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode). This documentation does not address -this action individually as it works similarly to the `run` action in terms of the CLI frontend. - -The `run` and `run-application` commands support passing additional configuration parameters via the +The `run` command support passing additional configuration parameters via the `-D` argument. For example setting the [maximum parallelism]({{< ref "docs/deployment/config#pipeline-max-parallelism" >}}#application-mode) for a job can be done by setting `-Dpipeline.max-parallelism=120`. This argument is very useful for -configuring per-job or application mode clusters, because you can pass any configuration parameter +configuring application mode clusters, because you can pass any configuration parameter to the cluster, without changing the configuration file. When submitting a job to an existing session cluster, only [execution configuration parameters]({{< ref "docs/deployment/config#execution" >}}) are supported. @@ -276,7 +272,7 @@ Use the [stop](#stopping-a-job-gracefully-creating-a-final-savepoint) action ins ### Starting a Job from a Savepoint -Starting a job from a savepoint can be achieved using the `run` (and `run-application`) action. +Starting a job from a savepoint can be achieved using the `run` action. ```bash $ ./bin/flink run \ --detached \ @@ -339,14 +335,6 @@ Here's an overview of actions supported by Flink's CLI tool: or job-related arguments can be passed if necessary. - - run-application - - This action executes jobs in }}#application-mode"> - Application Mode. Other than that, it requires the same parameters as the - run action. - - info @@ -415,17 +403,15 @@ Resource Provider section. Jobs can be submitted in different [Deployment Modes] The parameterization of a job submission differs based on the underlying framework and Deployment Mode. `bin/flink` offers a parameter `--target` to handle the different options. In addition to that, jobs -have to be submitted using either `run` (for [Session]({{< ref "docs/deployment/overview" >}}#session-mode) -and [Per-Job Mode]({{< ref "docs/deployment/overview" >}}#per-job-mode)) or `run-application` (for -[Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode)). See the following summary of +have to be submitted using `run` (for [Session]({{< ref "docs/deployment/overview" >}}#session-mode) +and [Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode)). See the following summary of parameter combinations: * YARN * `./bin/flink run --target yarn-session`: Submission to an already running Flink on YARN cluster - * `./bin/flink run --target yarn-per-job`: Submission spinning up a Flink on YARN cluster in Per-Job Mode - * `./bin/flink run-application --target yarn-application`: Submission spinning up Flink on YARN cluster in Application Mode + * `./bin/flink run --target yarn-application`: Submission spinning up Flink on YARN cluster in Application Mode * Kubernetes * `./bin/flink run --target kubernetes-session`: Submission to an already running Flink on Kubernetes cluster - * `./bin/flink run-application --target kubernetes-application`: Submission spinning up a Flink on Kubernetes cluster in Application Mode + * `./bin/flink run --target kubernetes-application`: Submission spinning up a Flink on Kubernetes cluster in Application Mode * Standalone: * `./bin/flink run --target local`: Local submission using a MiniCluster in Session Mode * `./bin/flink run --target remote`: Submission to an already running Flink cluster @@ -486,16 +472,9 @@ $ ./bin/flink run \ --python examples/python/table/word_count.py ``` -- Run a PyFlink job using a [YARN cluster in Per-Job Mode]({{< ref "docs/deployment/resource-providers/yarn" >}}#per-job-cluster-mode): -```bash -$ ./bin/flink run \ - --target yarn-per-job - --python examples/python/table/word_count.py -``` - - Run a PyFlink job using a [YARN cluster in Application Mode]({{< ref "docs/deployment/resource-providers/yarn" >}}#application-mode): ```bash -$ ./bin/flink run-application -t yarn-application \ +$ ./bin/flink run -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=1024m \ -Dyarn.application.name= \ @@ -516,7 +495,7 @@ If the size of an archive file is more than 2 GB, you could upload it to a distr - Run a PyFlink application on a native Kubernetes cluster having the cluster ID ``, it requires a docker image with PyFlink installed, please refer to [Enabling PyFlink in docker]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#enabling-python): ```bash -$ ./bin/flink run-application \ +$ ./bin/flink run \ --target kubernetes-application \ --parallelism 8 \ -Dkubernetes.cluster-id= \ @@ -534,7 +513,7 @@ Resource Provider section. Besides `--pyFiles`, `--pyModule` and `--python` mentioned above, there are also some other Python related options. Here's an overview of all the Python related options for the actions -`run` and `run-application` supported by Flink's CLI tool: +`run` supported by Flink's CLI tool: diff --git a/docs/content.zh/docs/deployment/overview.md b/docs/content.zh/docs/deployment/overview.md index 9be46a0b60198..e3d5e0168e957 100644 --- a/docs/content.zh/docs/deployment/overview.md +++ b/docs/content.zh/docs/deployment/overview.md @@ -74,7 +74,6 @@ When deploying Flink, there are often multiple options available for each buildi JobManager modes for job submissions:
  • Application Mode: runs the cluster exclusively for one application. The job's main method (or client) gets executed on the JobManager. Calling `execute`/`executeAsync` multiple times in an application is supported.
  • -
  • Per-Job Mode: runs the cluster exclusively for one job. The job's main method (or client) runs only prior to the cluster creation.
  • Session Mode: one JobManager instance manages multiple jobs sharing the same cluster of TaskManagers
@@ -173,7 +172,6 @@ covered by [FLINK-26606](https://issues.apache.org/jira/browse/FLINK-26606). Flink can execute applications in one of three ways: - in Application Mode, -- in a Per-Job Mode, - in Session Mode. The above modes differ in: @@ -182,7 +180,7 @@ Flink can execute applications in one of three ways: -{{< img class="img-fluid" width="80%" style="margin: 15px" src="/fig/deployment_modes.svg" alt="Figure for Deployment Modes" >}} +{{< img class="img-fluid" width="70%" style="margin: 15px" src="/fig/deployment_modes.png" alt="Figure for Deployment Modes" >}} #### Application Mode @@ -196,8 +194,8 @@ network bandwidth to download dependencies and ship binaries to the cluster, and Building on this observation, the *Application Mode* creates a cluster per submitted application, but this time, the `main()` method of the application is executed on the JobManager. Creating a cluster per application can be seen as creating a session cluster shared only among the jobs of a particular application, and torn down when -the application finishes. With this architecture, the *Application Mode* provides the same resource isolation -and load balancing guarantees as the *Per-Job* mode, but at the granularity of a whole application. Executing +the application finishes. With this architecture, the *Application Mode* provides the application granularity resource isolation +and load balancing guarantees. Executing the `main()` on the JobManager allows for saving the CPU cycles required, but also save the bandwidth required for downloading the dependencies locally. Furthermore, it allows for more even spread of the network load for downloading the dependencies of the applications in the cluster, as there is one JobManager per application. @@ -208,7 +206,7 @@ as in the other modes. This may have implications for your code as, for example, your environment using the `registerCachedFile()` must be accessible by the JobManager of your application. {{< /hint >}} -Compared to the *Per-Job* mode, the *Application Mode* allows the submission of applications consisting of +The *Application Mode* allows the submission of applications consisting of multiple jobs. The order of job execution is not affected by the deployment mode but by the call used to launch the job. Using `execute()`, which is blocking, establishes an order and it will lead to the execution of the "next" job being postponed until "this" job finishes. Using `executeAsync()`, which is @@ -224,16 +222,6 @@ Additionally, when any of multiple running jobs in Application Mode (submitted f Regular job completions (by the sources shutting down) are supported. {{< /hint >}} -#### Per-Job Mode - -Aiming at providing better resource isolation guarantees, the *Per-Job* mode uses the available resource provider -framework (e.g. YARN, Kubernetes) to spin up a cluster for each submitted job. This cluster is available to -that job only. When the job finishes, the cluster is torn down and any lingering resources (files, etc) are -cleared up. This provides better resource isolation, as a misbehaving job can only bring down its own -TaskManagers. In addition, it spreads the load of book-keeping across multiple JobManagers, as there is -one per job. For these reasons, the *Per-Job* resource allocation model is the preferred mode by many -production reasons. - #### Session Mode *Session mode* assumes an already running cluster and uses the resources of that cluster to execute any @@ -250,9 +238,7 @@ is responsible for the book-keeping of all the jobs in the cluster. #### Summary In *Session Mode*, the cluster lifecycle is independent of that of any job running on the cluster -and the resources are shared across all jobs. The *Per-Job* mode pays the price of spinning up a cluster -for every submitted job, but this comes with better isolation guarantees as the resources are not shared -across jobs. In this case, the lifecycle of the cluster is bound to that of the job. Finally, the +and the resources are shared across all jobs. The *Application Mode* creates a session cluster per application and executes the application's `main()` method on the cluster. diff --git a/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md b/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md index 0f8f586d09e34..92d5199d559ab 100644 --- a/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md +++ b/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md @@ -100,7 +100,7 @@ COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar After creating and publishing the Docker image under `custom-image-name`, you can start an Application cluster with the following command: ```bash -$ ./bin/flink run-application \ +$ ./bin/flink run \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image.ref=custom-image-name \ @@ -112,7 +112,7 @@ $ ./bin/flink run-application \ In case you have a locally available Flink job JAR, artifact upload can be used so Flink will upload the local artifact to DFS during deployment and fetch it on the deployed JobManager pod: ```bash -$ ./bin/flink run-application \ +$ ./bin/flink run \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ @@ -125,7 +125,7 @@ The `kubernetes.artifacts.local-upload-enabled` enables this feature, and `kuber You can add additional artifacts via the `user.artifacts.artifact-list` config option, which can contain a mix of local and remote artifacts: ```bash -$ ./bin/flink run-application \ +$ ./bin/flink run \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ @@ -139,14 +139,14 @@ In case the job JAR or any additional artifact is already available remotely via ```bash # FileSystem -$ ./bin/flink run-application \ +$ ./bin/flink run \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ s3://my-bucket/my-flink-job.jar # HTTP(S) -$ ./bin/flink run-application \ +$ ./bin/flink run \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ @@ -179,10 +179,6 @@ $ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my- You can override configurations set in [Flink configuration file]({{< ref "docs/deployment/config#flink-配置文件" >}}) by passing key-value pairs `-Dkey=value` to `bin/flink`. -### Per-Job Cluster Mode - -Flink on Kubernetes does not support Per-Job Cluster Mode. - ### Session Mode You have seen the deployment of a Session cluster in the [Getting Started](#getting-started) guide at the top of this page. diff --git a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md index c357acd0a0228..9a4d9dd37c7de 100644 --- a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md +++ b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md @@ -136,12 +136,6 @@ $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar $ kubectl delete -f jobmanager-application-non-ha.yaml ``` - - -### Per-Job 集群模式 - -在 Kubernetes 上部署 Standalone 集群时不支持 Per-Job 集群模式。 - ### Session 集群模式 diff --git a/docs/content.zh/docs/deployment/resource-providers/yarn.md b/docs/content.zh/docs/deployment/resource-providers/yarn.md index cf68314288f59..46f4af2600f54 100644 --- a/docs/content.zh/docs/deployment/resource-providers/yarn.md +++ b/docs/content.zh/docs/deployment/resource-providers/yarn.md @@ -83,7 +83,7 @@ Congratulations! You have successfully run a Flink application by deploying Flin ## Deployment Modes Supported by Flink on YARN -For production use, we recommend deploying Flink Applications in the [Per-job or Application Mode]({{< ref "docs/deployment/overview" >}}#deployment-modes), as these modes provide a better isolation for the Applications. +For production use, we recommend deploying Flink Applications in the [Application Mode]({{< ref "docs/deployment/overview" >}}#deployment-modes), as these modes provide a better isolation for the Applications. ### Application Mode @@ -91,7 +91,7 @@ Application Mode will launch a Flink cluster on YARN, where the main() method of The cluster will shut down as soon as the application has finished. You can manually stop the cluster using `yarn application -kill ` or by cancelling the Flink job. ```bash -./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar +./bin/flink run -t yarn-application ./examples/streaming/TopSpeedWindowing.jar ``` @@ -111,7 +111,7 @@ and pre-upload your application jar to a location accessible by all nodes in you command could look like: ```bash -./bin/flink run-application -t yarn-application \ +./bin/flink run -t yarn-application \ -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \ hdfs://myhdfs/jars/my-application.jar ``` @@ -120,28 +120,6 @@ The above will allow the job submission to be extra lightweight as the needed Fl are going to be picked up by the specified remote locations rather than be shipped to the cluster by the client. -### Per-Job Cluster Mode - -The Per-job Cluster mode will launch a Flink cluster on YARN, then run the provided application jar locally and finally submit the JobGraph to the JobManager on YARN. If you pass the `--detached` argument, the client will stop once the submission is accepted. - -The YARN cluster will stop once the job has stopped. - -```bash -./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar -``` - -Once a Per-Job Cluster is deployed, you can interact with it for operations like cancelling or taking a savepoint. - -```bash -# List running job on the cluster -./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY -# Cancel running job -./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY -``` - -Note that cancelling your job on an Per-Job Cluster will stop the cluster. - - ### Session Mode We describe deployment with the Session Mode in the [Getting Started](#getting-started) guide at the top of the page. @@ -241,9 +219,9 @@ The configuration parameter for specifying the REST endpoint port is [rest.bind- When deploying Flink with Session Mode on Yarn, only the JAR file specified in startup command will be recognized as user-jars and included into user classpath. -**PerJob Mode & Application Mode** +**Application Mode** -When deploying Flink with PerJob/Application Mode on Yarn, the JAR file specified in startup command and all JAR files in Flink's `usrlib` folder will be recognized as user-jars. +When deploying Flink with Application Mode on Yarn, the JAR file specified in startup command and all JAR files in Flink's `usrlib` folder will be recognized as user-jars. By default Flink will include the user-jars into the system classpath. This behavior can be controlled with the [yarn.classpath.include-user-jar]({{< ref "docs/deployment/config" >}}#yarn-classpath-include-user-jar) parameter. When setting this to `DISABLED` Flink will include the jar in the user classpath instead. diff --git a/docs/content.zh/docs/ops/debugging/debugging_classloading.md b/docs/content.zh/docs/ops/debugging/debugging_classloading.md index df55db4c2c516..cc36a2449e104 100644 --- a/docs/content.zh/docs/ops/debugging/debugging_classloading.md +++ b/docs/content.zh/docs/ops/debugging/debugging_classloading.md @@ -58,19 +58,11 @@ created for an job/application and will contain the job/application's jar files. --> -**Per-Job模式(已弃用)(Yarn)** - -当前只有Yarn支持Per-Job模式。默认情况下,Flink集群运行在Per-Job模式下时会将用户的jar文件包含在系统的classpath中。 -这种模式可以由[yarn.classpath.include-user-jar]({{< ref "docs/deployment/config" >}}#yarn-classpath-include-user-jar) 参数控制。 -当该参数设定为`DISABLED`时,Flink会将用户jar文件含在用户的classpath中,并由*FlinkUserCodeClassLoader*进行动态加载。 - -详细信息参见[Flink on Yarn]({{< ref "docs/deployment/resource-providers/yarn" >}})。 - **Application模式(Standalone/Yarn/Kubernetes)** 当Application模式的Flink集群基于Standalone或Kubernetes方式运行时,用户jar文件(启动命令指定的jar文件和Flink的`usrlib`目录中的jar包)会由*FlinkUserCodeClassLoader*进行动态加载。 -当Flink集群以Application模式运行时,用户jar文件(启动命令指定的jar文件和Flink的`usrlib`目录中的jar包)默认情况下会包含在系统classpath(*AppClassLoader*)。与Per-Job模式相同,当[yarn.classpath.include-user-jar]({{< ref "docs/deployment/config" >}}#yarn-classpath-include-user-jar)设置为`DISABLED`时,Flink会将用户jar文件含在用户的classpath中,并由*FlinkUserCodeClassLoader*进行动态加载。 +当Flink集群以Application模式运行时,用户jar文件(启动命令指定的jar文件和Flink的`usrlib`目录中的jar包)默认情况下会包含在系统classpath(*AppClassLoader*)。当[yarn.classpath.include-user-jar]({{< ref "docs/deployment/config" >}}#yarn-classpath-include-user-jar)设置为`DISABLED`时,Flink会将用户jar文件含在用户的classpath中,并由*FlinkUserCodeClassLoader*进行动态加载。 ## 倒置类加载(Inverted Class Loading)和ClassLoader解析顺序 diff --git a/docs/content/docs/concepts/flink-architecture.md b/docs/content/docs/concepts/flink-architecture.md index 722bffe1d72ad..458f0b03038ee 100644 --- a/docs/content/docs/concepts/flink-architecture.md +++ b/docs/content/docs/concepts/flink-architecture.md @@ -217,35 +217,4 @@ isolation guarantees. Formerly, a Flink Session Cluster was also known as a Flink Cluster in `session mode`. {{< /hint >}} -### Flink Job Cluster (deprecated) - -{{< hint danger >}} -Per-job mode is only supported by YARN and has been deprecated in Flink 1.15. -It will be dropped in [FLINK-26000](https://issues.apache.org/jira/browse/FLINK-26000). -Please consider application mode to launch a dedicated cluster per-job on YARN. -{{< /hint >}} - -* **Cluster Lifecycle**: in a Flink Job Cluster, the available cluster manager - (like YARN) is used to spin up a cluster for each submitted job - and this cluster is available to that job only. Here, the client first - requests resources from the cluster manager to start the JobManager and - submits the job to the Dispatcher running inside this process. TaskManagers - are then lazily allocated based on the resource requirements of the job. Once - the job is finished, the Flink Job Cluster is torn down. - -* **Resource Isolation**: a fatal error in the JobManager only affects the one job running in that Flink Job Cluster. - -* **Other considerations**: because the ResourceManager has to apply and wait - for external resource management components to start the TaskManager - processes and allocate resources, Flink Job Clusters are more suited to large - jobs that are long-running, have high-stability requirements and are not - sensitive to longer startup times. - -{{< hint info >}} -Formerly, a Flink Job Cluster was also known as a Flink Cluster in `job (or per-job) mode`. -{{< /hint >}} -{{< hint info >}} -Flink Job Clusters are only supported with YARN. -{{< /hint >}} - {{< top >}} diff --git a/docs/content/docs/deployment/cli.md b/docs/content/docs/deployment/cli.md index 3009a68289533..9250704ef4174 100644 --- a/docs/content/docs/deployment/cli.md +++ b/docs/content/docs/deployment/cli.md @@ -74,11 +74,7 @@ stored in a variable `JOB_ID` for the commands below: $ export JOB_ID="cca7bc1061d61cf15238e92312c2fc20" ``` -There is another action called `run-application` available to run the job in -[Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode). This documentation does not address -this action individually as it works similarly to the `run` action in terms of the CLI frontend. - -The `run` and `run-application` commands support passing additional configuration parameters via the +The `run` command support passing additional configuration parameters via the `-D` argument. For example setting the [maximum parallelism]({{< ref "docs/deployment/config#pipeline-max-parallelism" >}}#application-mode) for a job can be done by setting `-Dpipeline.max-parallelism=120`. This argument is very useful for configuring application mode clusters, because you can pass any configuration parameter @@ -274,7 +270,7 @@ Use the [stop](#stopping-a-job-gracefully-creating-a-final-savepoint) action ins ### Starting a Job from a Savepoint -Starting a job from a savepoint can be achieved using the `run` (and `run-application`) action. +Starting a job from a savepoint can be achieved using the `run` action. ```bash $ ./bin/flink run \ --detached \ @@ -337,14 +333,6 @@ Here's an overview of actions supported by Flink's CLI tool: or job-related arguments can be passed if necessary. - - - -
run-application - This action executes jobs in }}#application-mode"> - Application Mode. Other than that, it requires the same parameters as the - run action. -
info @@ -413,17 +401,15 @@ Resource Provider section. Jobs can be submitted in different [Deployment Modes] The parameterization of a job submission differs based on the underlying framework and Deployment Mode. `bin/flink` offers a parameter `--target` to handle the different options. In addition to that, jobs -have to be submitted using either `run` (for [Session]({{< ref "docs/deployment/overview" >}}#session-mode) -and [Per-Job Mode (deprecated)]({{< ref "docs/deployment/overview" >}}#per-job-mode)) or `run-application` (for -[Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode)). See the following summary of +have to be submitted using `run` (for [Session]({{< ref "docs/deployment/overview" >}}#session-mode) +and [Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode)). See the following summary of parameter combinations: * YARN * `./bin/flink run --target yarn-session`: Submission to an already running Flink on YARN cluster - * `./bin/flink run --target yarn-per-job`: Submission spinning up a Flink on YARN cluster in Per-Job Mode (deprecated) - * `./bin/flink run-application --target yarn-application`: Submission spinning up Flink on YARN cluster in Application Mode + * `./bin/flink run --target yarn-application`: Submission spinning up Flink on YARN cluster in Application Mode * Kubernetes * `./bin/flink run --target kubernetes-session`: Submission to an already running Flink on Kubernetes cluster - * `./bin/flink run-application --target kubernetes-application`: Submission spinning up a Flink on Kubernetes cluster in Application Mode + * `./bin/flink run --target kubernetes-application`: Submission spinning up a Flink on Kubernetes cluster in Application Mode * Standalone: * `./bin/flink run --target local`: Local submission using a MiniCluster in Session Mode * `./bin/flink run --target remote`: Submission to an already running Flink cluster @@ -484,16 +470,9 @@ $ ./bin/flink run \ --python examples/python/table/word_count.py ``` -- Run a PyFlink job using a [YARN cluster in Per-Job Mode]({{< ref "docs/deployment/resource-providers/yarn" >}}#per-job-cluster-mode): -```bash -$ ./bin/flink run \ - --target yarn-per-job - --python examples/python/table/word_count.py -``` - - Run a PyFlink job using a [YARN cluster in Application Mode]({{< ref "docs/deployment/resource-providers/yarn" >}}#application-mode): ```bash -$ ./bin/flink run-application -t yarn-application \ +$ ./bin/flink run -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=1024m \ -Dyarn.application.name= \ @@ -514,7 +493,7 @@ If the size of an archive file is more than 2 GB, you could upload it to a distr - Run a PyFlink application on a native Kubernetes cluster having the cluster ID ``, it requires a docker image with PyFlink installed, please refer to [Enabling PyFlink in docker]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#enabling-python): ```bash -$ ./bin/flink run-application \ +$ ./bin/flink run \ --target kubernetes-application \ --parallelism 8 \ -Dkubernetes.cluster-id= \ @@ -532,7 +511,7 @@ Resource Provider section. Besides `--pyFiles`, `--pyModule` and `--python` mentioned above, there are also some other Python related options. Here's an overview of all the Python related options for the actions -`run` and `run-application` supported by Flink's CLI tool: +`run` supported by Flink's CLI tool: diff --git a/docs/content/docs/deployment/overview.md b/docs/content/docs/deployment/overview.md index f87264d0b00ca..79e22a25c9b25 100644 --- a/docs/content/docs/deployment/overview.md +++ b/docs/content/docs/deployment/overview.md @@ -74,7 +74,6 @@ When deploying Flink, there are often multiple options available for each buildi JobManager modes for job submissions:
  • Application Mode: runs the cluster exclusively for one application. The job's main method (or client) gets executed on the JobManager. Calling `execute`/`executeAsync` multiple times in an application is supported.
  • -
  • Per-Job Mode: runs the cluster exclusively for one job. The job's main method (or client) runs only prior to the cluster creation.
  • Session Mode: one JobManager instance manages multiple jobs sharing the same cluster of TaskManagers
@@ -175,15 +174,13 @@ covered by [FLINK-26606](https://issues.apache.org/jira/browse/FLINK-26606). Flink can execute applications in one of three ways: - in Application Mode, - in Session Mode, -- in a Per-Job Mode (deprecated). The above modes differ in: - the cluster lifecycle and resource isolation guarantees - whether the application's `main()` method is executed on the client or on the cluster. - -{{< img class="img-fluid" width="100%" style="margin: 15px" src="/fig/deployment_modes.svg" alt="Figure for Deployment Modes" >}} +{{< img class="img-fluid" width="70%" style="margin: 10px" src="/fig/deployment_modes.png" alt="Figure for Deployment Modes" >}} ### Application Mode @@ -197,8 +194,8 @@ network bandwidth to download dependencies and ship binaries to the cluster, and Building on this observation, the *Application Mode* creates a cluster per submitted application, but this time, the `main()` method of the application is executed by the *JobManager*. Creating a cluster per application can be seen as creating a session cluster shared only among the jobs of a particular application, and turning down when -the application finishes. With this architecture, the *Application Mode* provides the same resource isolation -and load balancing guarantees as the *Per-Job* mode, but at the granularity of a whole application. +the application finishes. With this architecture, the *Application Mode* provides the application granularity resource isolation +and load balancing guarantees. The *Application Mode* builds on an assumption that the user jars are already available on the classpath (`usrlib` folder) of all Flink components that needs access to it (*JobManager*, *TaskManager*). In other words, your application comes @@ -212,7 +209,7 @@ Executing the `main()` method on the cluster may have other implications for you in your environment using the `registerCachedFile()` must be accessible by the JobManager of your application. {{< /hint >}} -Compared to the *Per-Job (deprecated)* mode, the *Application Mode* allows the submission of applications consisting of +The *Application Mode* allows the submission of applications consisting of multiple jobs. The order of job execution is not affected by the deployment mode but by the call used to launch the job. Using `execute()`, which is blocking, establishes an order and it will lead to the execution of the "next" job being postponed until "this" job finishes. Using `executeAsync()`, which is @@ -240,21 +237,6 @@ restarting jobs accessing the filesystem concurrently and making it unavailable Additionally, having a single cluster running multiple jobs implies more load for the JobManager, who is responsible for the book-keeping of all the jobs in the cluster. -### Per-Job Mode (deprecated) - -{{< hint danger >}} -Per-job mode is only supported by YARN and has been deprecated in Flink 1.15. -It will be dropped in [FLINK-26000](https://issues.apache.org/jira/browse/FLINK-26000). -Please consider application mode to launch a dedicated cluster per-job on YARN. -{{< /hint >}} - -Aiming at providing better resource isolation guarantees, the *Per-Job* mode uses the available resource provider -framework (e.g. YARN) to spin up a cluster for each submitted job. This cluster is available to -that job only. When the job finishes, the cluster is torn down and any lingering resources (files, etc) are -cleared up. This provides better resource isolation, as a misbehaving job can only bring down its own -TaskManagers. In addition, it spreads the load of book-keeping across multiple JobManagers, as there is -one per job. - ### Summary In *Session Mode*, the cluster lifecycle is independent of that of any job running on the cluster diff --git a/docs/content/docs/deployment/resource-providers/native_kubernetes.md b/docs/content/docs/deployment/resource-providers/native_kubernetes.md index c823f787838df..47f713cfb9df9 100644 --- a/docs/content/docs/deployment/resource-providers/native_kubernetes.md +++ b/docs/content/docs/deployment/resource-providers/native_kubernetes.md @@ -108,7 +108,7 @@ COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar After creating and publishing the Docker image under `custom-image-name`, you can start an Application cluster with the following command: ```bash -$ ./bin/flink run-application \ +$ ./bin/flink run \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image.ref=custom-image-name \ @@ -120,7 +120,7 @@ $ ./bin/flink run-application \ In case you have a locally available Flink job JAR, artifact upload can be used so Flink will upload the local artifact to DFS during deployment and fetch it on the deployed JobManager pod: ```bash -$ ./bin/flink run-application \ +$ ./bin/flink run \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ @@ -133,7 +133,7 @@ The `kubernetes.artifacts.local-upload-enabled` enables this feature, and `kuber You can add additional artifacts via the `user.artifacts.artifact-list` config option, which can contain a mix of local and remote artifacts: ```bash -$ ./bin/flink run-application \ +$ ./bin/flink run \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ @@ -147,14 +147,14 @@ In case the job JAR or any additional artifact is already available remotely via ```bash # FileSystem -$ ./bin/flink run-application \ +$ ./bin/flink run \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ s3://my-bucket/my-flink-job.jar # HTTP(S) -$ ./bin/flink run-application \ +$ ./bin/flink run \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ diff --git a/docs/content/docs/deployment/resource-providers/yarn.md b/docs/content/docs/deployment/resource-providers/yarn.md index 7c48d526f8c2c..20f45c539720c 100644 --- a/docs/content/docs/deployment/resource-providers/yarn.md +++ b/docs/content/docs/deployment/resource-providers/yarn.md @@ -95,7 +95,7 @@ Application Mode will launch a Flink cluster on YARN, where the main() method of The cluster will shut down as soon as the application has finished. You can manually stop the cluster using `yarn application -kill ` or by cancelling the Flink job. ```bash -./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar +./bin/flink run -t yarn-application ./examples/streaming/TopSpeedWindowing.jar ``` Once an Application Mode cluster is deployed, you can interact with it for operations like cancelling or taking a savepoint. @@ -114,7 +114,7 @@ and pre-upload your application jar to a location accessible by all nodes in you command could look like: ```bash -./bin/flink run-application -t yarn-application \ +./bin/flink run -t yarn-application \ -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \ hdfs://myhdfs/jars/my-application.jar ``` @@ -157,37 +157,6 @@ The YARN session client also has a few "shortcut arguments" for commonly used se {{< top >}} -### Per-Job Mode (deprecated) - -{{< hint danger >}} -Per-job mode is only supported by YARN and has been deprecated in Flink 1.15. -It will be dropped in [FLINK-26000](https://issues.apache.org/jira/browse/FLINK-26000). -Please consider application mode to launch a dedicated cluster per-job on YARN. -{{< /hint >}} - -{{< hint info >}} -For high-level intuition behind the per-job mode, please refer to the [deployment mode overview]({{< ref "docs/deployment/overview#per-job-mode" >}}). -{{< /hint >}} - -The Per-job Cluster mode will launch a Flink cluster on YARN, then run the provided application jar locally and finally submit the JobGraph to the JobManager on YARN. If you pass the `--detached` argument, the client will stop once the submission is accepted. - -The YARN cluster will stop once the job has stopped. - -```bash -./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar -``` - -Once a Per-Job Cluster is deployed, you can interact with it for operations like cancelling or taking a savepoint. - -```bash -# List running job on the cluster -./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY -# Cancel running job -./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY -``` - -Note that cancelling your job on an Per-Job Cluster will stop the cluster. - ## Flink on YARN Reference ### Configuring Flink on YARN @@ -257,9 +226,9 @@ The configuration parameter for specifying the REST endpoint port is [rest.bind- When deploying Flink with Session Mode on Yarn, only the JAR file specified in startup command will be recognized as user-jars and included into user classpath. -**PerJob Mode & Application Mode** +**Application Mode** -When deploying Flink with PerJob/Application Mode on Yarn, the JAR file specified in startup command and all JAR files in Flink's `usrlib` folder will be recognized as user-jars. +When deploying Flink with Application Mode on Yarn, the JAR file specified in startup command and all JAR files in Flink's `usrlib` folder will be recognized as user-jars. By default Flink will include the user-jars into the system classpath. This behavior can be controlled with the [yarn.classpath.include-user-jar]({{< ref "docs/deployment/config" >}}#yarn-classpath-include-user-jar) parameter. When setting this to `DISABLED` Flink will include the jar in the user classpath instead. diff --git a/docs/content/docs/ops/debugging/debugging_classloading.md b/docs/content/docs/ops/debugging/debugging_classloading.md index 27d25de8e4eec..99da741e92b6a 100644 --- a/docs/content/docs/ops/debugging/debugging_classloading.md +++ b/docs/content/docs/ops/debugging/debugging_classloading.md @@ -61,21 +61,13 @@ created for an job/application and will contain the job/application's jar files. --> -**Per-Job Mode (deprecated) (Yarn)** - -Currently, only Yarn supports Per-Job mode. By default, running a Flink cluster in Per-Job mode will include the user jars -(the JAR file specified in startup command and all JAR files in Flink's `usrlib` folder) into the system classpath (the *AppClassLoader*). -This behavior can be controlled with the [yarn.classpath.include-user-jar]({{< ref "docs/deployment/config" >}}#yarn-classpath-include-user-jar) config option. -When setting it to `DISABLED`, Flink will include the user jars in the user classpath and load them dynamically by *FlinkUserCodeClassLoader*. -See [Flink on Yarn]({{< ref "docs/deployment/resource-providers/yarn" >}}) for more details. - **Application Mode (Standalone/Yarn/Kubernetes)** When run a Standalone/Kubernetes Flink cluster in Application Mode, the user jars (the JAR file specified in startup command and all JAR files in Flink's `usrlib` folder) will be loaded dynamically by *FlinkUserCodeClassLoader*. When run a Yarn Flink cluster in Application Mode, the user jars (the JAR file specified in startup command and all JAR files in Flink's `usrlib` folder) -will be included into the system classpath (the *AppClassLoader*) by default. Same as Per-Job mode, when setting the [yarn.classpath.include-user-jar]({{< ref "docs/deployment/config" >}}#yarn-classpath-include-user-jar) +will be included into the system classpath (the *AppClassLoader*) by default. When setting the [yarn.classpath.include-user-jar]({{< ref "docs/deployment/config" >}}#yarn-classpath-include-user-jar) to `DISABLED`, Flink will include the user jars in the user classpath and load them dynamically by *FlinkUserCodeClassLoader*. ## Inverted Class Loading and ClassLoader Resolution Order diff --git a/docs/static/fig/deployment_modes.png b/docs/static/fig/deployment_modes.png new file mode 100644 index 0000000000000..3909b214fd5bb Binary files /dev/null and b/docs/static/fig/deployment_modes.png differ diff --git a/docs/static/fig/deployment_modes.svg b/docs/static/fig/deployment_modes.svg deleted file mode 100644 index a3b00933abfa3..0000000000000 --- a/docs/static/fig/deployment_modes.svg +++ /dev/null @@ -1,19 +0,0 @@ - - \ No newline at end of file