Skip to content

Commit

Permalink
[FLINK-36310][docs] Remove per-job mode, run-application and scala re…
Browse files Browse the repository at this point in the history
…lated documents
  • Loading branch information
codenohup authored and reswqa committed Sep 30, 2024
1 parent 54813b6 commit 7a60c19
Show file tree
Hide file tree
Showing 18 changed files with 113 additions and 300 deletions.
86 changes: 60 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/)

* A streaming-first runtime that supports both batch processing and data streaming programs

* Elegant and fluent APIs in Java and Scala
* Elegant and fluent APIs in Java

* A runtime that supports very high throughput and low event latency at the same time

Expand All @@ -23,8 +23,6 @@ Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/)

* Libraries for Graph processing (batch), Machine Learning (batch), and Complex Event Processing (streaming)

* Built-in support for iterative programs (BSP) in the DataSet (batch) API

* Custom memory management for efficient and robust switching between in-memory and out-of-core data processing algorithms

* Compatibility layers for Apache Hadoop MapReduce
Expand All @@ -33,32 +31,68 @@ Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/)


### Streaming Example
```scala
case class WordWithCount(word: String, count: Long)

val text = env.socketTextStream(host, port, '\n')

val windowCounts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.window(TumblingProcessingTimeWindow.of(Duration.ofSeconds(5)))
.sum("count")

windowCounts.print()
```java
// pojo class WordWithCount
public class WordWithCount {
public String word;
public int count;

public WordWithCount() {}

public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}

// main method
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream(host, port);
DataStream<WordWithCount> windowCounts = text
.flatMap(
(FlatMapFunction<String, String>) (line, collector)
-> Arrays.stream(line.split("\\s")).forEach(collector::collect)
).returns(String.class)
.map(word -> new WordWithCount(word, 1)).returns(TypeInformation.of(WordWithCount.class))
.keyBy(wordWithCnt -> wordWithCnt.word)
.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
.sum("count").returns(TypeInformation.of(WordWithCount.class));

windowCounts.print();
env.execute();
}
```

### Batch Example
```scala
case class WordWithCount(word: String, count: Long)

val text = env.readTextFile(path)

val counts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.groupBy("word")
.sum("count")

counts.writeAsCsv(outputPath)
```java
// pojo class WordWithCount
public class WordWithCount {
public String word;
public int count;

public WordWithCount() {}

public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}

// main method
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("MyInput.txt")).build();
DataStreamSource<String> text = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySource");
DataStream<WordWithCount> windowCounts = text
.flatMap((FlatMapFunction<String, String>) (line, collector) -> Arrays
.stream(line.split("\\s"))
.forEach(collector::collect)).returns(String.class)
.map(word -> new WordWithCount(word, 1)).returns(TypeInformation.of(WordWithCount.class))
.keyBy(wordWintCount -> wordWintCount.word)
.sum("count").returns(TypeInformation.of(WordWithCount.class));

windowCounts.print();
env.execute();
```


Expand Down
6 changes: 3 additions & 3 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,14 @@ to its documentation markdown. The following are available for use:

#### Flink Artifact

{{< artifact flink-streaming-scala withScalaVersion >}}
{{< artifact flink-table-api-scala withScalaVersion >}}

This will be replaced by the maven artifact for flink-streaming-scala that users should copy into their pom.xml file. It will render out to:
This will be replaced by the maven artifact for flink-table-api-scala that users should copy into their pom.xml file. It will render out to:

```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<artifactId>flink-table-api-scala_2.12</artifactId>
<version><!-- current flink version --></version>
</dependency>
```
Expand Down
3 changes: 0 additions & 3 deletions docs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,13 @@ pygmentsUseClasses = true

JavaDocs = "//nightlies.apache.org/flink/flink-docs-master/api/java/"

ScalaDocs = "//nightlies.apache.org/flink/flink-docs-master/api/scala/index.html#org.apache.flink.api.scala.package"

PyDocs = "//nightlies.apache.org/flink/flink-docs-master/api/python/"

# External links at the bottom
# of the menu
MenuLinks = [
["Project Homepage", "//flink.apache.org"],
["JavaDocs", "//nightlies.apache.org/flink/flink-docs-master/api/java/"],
["ScalaDocs", "//nightlies.apache.org/flink/flink-docs-master/api/scala/index.html#org.apache.flink.api.scala.package"],
["PyDocs", "//nightlies.apache.org/flink/flink-docs-master/api/python/"]
]

Expand Down
17 changes: 1 addition & 16 deletions docs/content.zh/docs/concepts/flink-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ _JobManager_ 具有许多与协调 Flink 应用程序的分布式执行有关的

_Flink 应用程序_ 是从其 ``main()`` 方法产生的一个或多个 Flink 作业的任何用户程序。这些作业的执行可以在本地 JVM(`LocalEnvironment`)中进行,或具有多台机器的集群的远程设置(``RemoteEnvironment``)中进行。对于每个程序,`ExecutionEnvironment` 提供了一些方法来控制作业执行(例如设置并行度)并与外界交互(请参考 [Flink 程序剖析]({{< ref "docs/dev/datastream/overview" >}}#anatomy-of-a-flink-program) )。

Flink 应用程序的作业可以被提交到长期运行的 [Flink Session 集群]({{< ref "docs/concepts/glossary" >}}#flink-session-cluster)、专用的 [Flink Job 集群]({{< ref "docs/concepts/glossary" >}}#flink-job-cluster)[Flink Application 集群]({{< ref "docs/concepts/glossary" >}}#flink-application-cluster)。这些选项之间的差异主要与集群的生命周期和资源隔离保证有关。
Flink 应用程序的作业可以被提交到长期运行的 [Flink Session 集群]({{< ref "docs/concepts/glossary" >}}#flink-session-cluster) 或 [Flink Application 集群]({{< ref "docs/concepts/glossary" >}}#flink-application-cluster)。这些选项之间的差异主要与集群的生命周期和资源隔离保证有关。

### Flink Session 集群

Expand All @@ -111,21 +111,6 @@ Flink 应用程序的作业可以被提交到长期运行的 [Flink Session 集
以前,Flink Session 集群也被称为 <i> session 模式</i>下的 Flink 集群。
{{< /hint >}}

### Flink Job 集群

* **集群生命周期**:在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。在这里,客户端首先从集群管理器请求资源启动 JobManager,然后将作业提交给在这个进程中运行的 Dispatcher。然后根据作业的资源请求惰性的分配 TaskManager。一旦作业完成,Flink Job 集群将被拆除。

* **资源隔离**:JobManager 中的致命错误仅影响在 Flink Job 集群中运行的一个作业。

* **其他注意事项**:由于 ResourceManager 必须应用并等待外部资源管理组件来启动 TaskManager 进程和分配资源,因此 Flink Job 集群更适合长期运行、具有高稳定性要求且对较长的启动时间不敏感的大型作业。

{{< hint info >}}
以前,Flink Job 集群也被称为<i> job (or per-job) 模式</i>下的 Flink 集群。
{{< /hint >}}
{{< hint info >}}
Kubernetes 不支持 Flink Job 集群。 请参考 [Standalone Kubernetes]({{< ref "docs/deployment/resource-providers/standalone/kubernetes" >}}#per-job-cluster-mode) 和 [Native Kubernetes]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}}#per-job-cluster-mode)。
{{< /hint >}}

### Flink Application 集群

* **集群生命周期**:Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且 ``main()``方法在集群上而不是客户端上运行。提交作业是一个单步骤过程:无需先启动 Flink 集群,然后将作业提交到现有的 session 集群;相反,将应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(``ApplicationClusterEntryPoint``)负责调用 ``main()``方法来提取 JobGraph。例如,这允许你像在 Kubernetes 上部署任何其他应用程序一样部署 Flink 应用程序。因此,Flink Application 集群的寿命与 Flink 应用程序的寿命有关。
Expand Down
41 changes: 10 additions & 31 deletions docs/content.zh/docs/deployment/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,10 @@ stored in a variable `JOB_ID` for the commands below:
$ export JOB_ID="cca7bc1061d61cf15238e92312c2fc20"
```

There is another action called `run-application` available to run the job in
[Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode). This documentation does not address
this action individually as it works similarly to the `run` action in terms of the CLI frontend.

The `run` and `run-application` commands support passing additional configuration parameters via the
The `run` command support passing additional configuration parameters via the
`-D` argument. For example setting the [maximum parallelism]({{< ref "docs/deployment/config#pipeline-max-parallelism" >}}#application-mode)
for a job can be done by setting `-Dpipeline.max-parallelism=120`. This argument is very useful for
configuring per-job or application mode clusters, because you can pass any configuration parameter
configuring application mode clusters, because you can pass any configuration parameter
to the cluster, without changing the configuration file.

When submitting a job to an existing session cluster, only [execution configuration parameters]({{< ref "docs/deployment/config#execution" >}}) are supported.
Expand Down Expand Up @@ -276,7 +272,7 @@ Use the [stop](#stopping-a-job-gracefully-creating-a-final-savepoint) action ins

### Starting a Job from a Savepoint

Starting a job from a savepoint can be achieved using the `run` (and `run-application`) action.
Starting a job from a savepoint can be achieved using the `run` action.
```bash
$ ./bin/flink run \
--detached \
Expand Down Expand Up @@ -339,14 +335,6 @@ Here's an overview of actions supported by Flink's CLI tool:
or job-related arguments can be passed if necessary.
</td>
</tr>
<tr>
<td><code class="highlighter-rouge">run-application</code></td>
<td>
This action executes jobs in <a href="{{< ref "docs/deployment/overview" >}}#application-mode">
Application Mode</a>. Other than that, it requires the same parameters as the
<code class="highlighter-rouge">run</code> action.
</td>
</tr>
<tr>
<td><code class="highlighter-rouge">info</code></td>
<td>
Expand Down Expand Up @@ -415,17 +403,15 @@ Resource Provider section. Jobs can be submitted in different [Deployment Modes]
The parameterization of a job submission differs based on the underlying framework and Deployment Mode.

`bin/flink` offers a parameter `--target` to handle the different options. In addition to that, jobs
have to be submitted using either `run` (for [Session]({{< ref "docs/deployment/overview" >}}#session-mode)
and [Per-Job Mode]({{< ref "docs/deployment/overview" >}}#per-job-mode)) or `run-application` (for
[Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode)). See the following summary of
have to be submitted using `run` (for [Session]({{< ref "docs/deployment/overview" >}}#session-mode)
and [Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode)). See the following summary of
parameter combinations:
* YARN
* `./bin/flink run --target yarn-session`: Submission to an already running Flink on YARN cluster
* `./bin/flink run --target yarn-per-job`: Submission spinning up a Flink on YARN cluster in Per-Job Mode
* `./bin/flink run-application --target yarn-application`: Submission spinning up Flink on YARN cluster in Application Mode
* `./bin/flink run --target yarn-application`: Submission spinning up Flink on YARN cluster in Application Mode
* Kubernetes
* `./bin/flink run --target kubernetes-session`: Submission to an already running Flink on Kubernetes cluster
* `./bin/flink run-application --target kubernetes-application`: Submission spinning up a Flink on Kubernetes cluster in Application Mode
* `./bin/flink run --target kubernetes-application`: Submission spinning up a Flink on Kubernetes cluster in Application Mode
* Standalone:
* `./bin/flink run --target local`: Local submission using a MiniCluster in Session Mode
* `./bin/flink run --target remote`: Submission to an already running Flink cluster
Expand Down Expand Up @@ -486,16 +472,9 @@ $ ./bin/flink run \
--python examples/python/table/word_count.py
```

- Run a PyFlink job using a [YARN cluster in Per-Job Mode]({{< ref "docs/deployment/resource-providers/yarn" >}}#per-job-cluster-mode):
```bash
$ ./bin/flink run \
--target yarn-per-job
--python examples/python/table/word_count.py
```

- Run a PyFlink job using a [YARN cluster in Application Mode]({{< ref "docs/deployment/resource-providers/yarn" >}}#application-mode):
```bash
$ ./bin/flink run-application -t yarn-application \
$ ./bin/flink run -t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.application.name=<ApplicationName> \
Expand All @@ -516,7 +495,7 @@ If the size of an archive file is more than 2 GB, you could upload it to a distr

- Run a PyFlink application on a native Kubernetes cluster having the cluster ID `<ClusterId>`, it requires a docker image with PyFlink installed, please refer to [Enabling PyFlink in docker]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#enabling-python):
```bash
$ ./bin/flink run-application \
$ ./bin/flink run \
--target kubernetes-application \
--parallelism 8 \
-Dkubernetes.cluster-id=<ClusterId> \
Expand All @@ -534,7 +513,7 @@ Resource Provider section.

Besides `--pyFiles`, `--pyModule` and `--python` mentioned above, there are also some other Python
related options. Here's an overview of all the Python related options for the actions
`run` and `run-application` supported by Flink's CLI tool:
`run` supported by Flink's CLI tool:
<table class="table table-bordered">
<thead>
<tr>
Expand Down
24 changes: 5 additions & 19 deletions docs/content.zh/docs/deployment/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ When deploying Flink, there are often multiple options available for each buildi
JobManager <a href="#deployment-modes">modes for job submissions</a>:
<ul>
<li><b>Application Mode</b>: runs the cluster exclusively for one application. The job's main method (or client) gets executed on the JobManager. Calling `execute`/`executeAsync` multiple times in an application is supported.</li>
<li><b>Per-Job Mode</b>: runs the cluster exclusively for one job. The job's main method (or client) runs only prior to the cluster creation.</li>
<li><b>Session Mode</b>: one JobManager instance manages multiple jobs sharing the same cluster of TaskManagers</li>
</ul>
</td>
Expand Down Expand Up @@ -173,7 +172,6 @@ covered by [FLINK-26606](https://issues.apache.org/jira/browse/FLINK-26606).

Flink can execute applications in one of three ways:
- in Application Mode,
- in a Per-Job Mode,
- in Session Mode.

The above modes differ in:
Expand All @@ -182,7 +180,7 @@ Flink can execute applications in one of three ways:


<!-- Image source: https://docs.google.com/drawings/d/1EfloufuOp1A7YDwZmBEsHKRLIrrbtRkoWRPcfZI5RYQ/edit?usp=sharing -->
{{< img class="img-fluid" width="80%" style="margin: 15px" src="/fig/deployment_modes.svg" alt="Figure for Deployment Modes" >}}
{{< img class="img-fluid" width="70%" style="margin: 15px" src="/fig/deployment_modes.png" alt="Figure for Deployment Modes" >}}

#### Application Mode

Expand All @@ -196,8 +194,8 @@ network bandwidth to download dependencies and ship binaries to the cluster, and
Building on this observation, the *Application Mode* creates a cluster per submitted application, but this time,
the `main()` method of the application is executed on the JobManager. Creating a cluster per application can be
seen as creating a session cluster shared only among the jobs of a particular application, and torn down when
the application finishes. With this architecture, the *Application Mode* provides the same resource isolation
and load balancing guarantees as the *Per-Job* mode, but at the granularity of a whole application. Executing
the application finishes. With this architecture, the *Application Mode* provides the application granularity resource isolation
and load balancing guarantees. Executing
the `main()` on the JobManager allows for saving the CPU cycles required, but also save the bandwidth required
for downloading the dependencies locally. Furthermore, it allows for more even spread of the network load for
downloading the dependencies of the applications in the cluster, as there is one JobManager per application.
Expand All @@ -208,7 +206,7 @@ as in the other modes. This may have implications for your code as, for example,
your environment using the `registerCachedFile()` must be accessible by the JobManager of your application.
{{< /hint >}}

Compared to the *Per-Job* mode, the *Application Mode* allows the submission of applications consisting of
The *Application Mode* allows the submission of applications consisting of
multiple jobs. The order of job execution is not affected by the deployment mode but by the call used
to launch the job. Using `execute()`, which is blocking, establishes an order and it will lead to the
execution of the "next" job being postponed until "this" job finishes. Using `executeAsync()`, which is
Expand All @@ -224,16 +222,6 @@ Additionally, when any of multiple running jobs in Application Mode (submitted f
Regular job completions (by the sources shutting down) are supported.
{{< /hint >}}

#### Per-Job Mode

Aiming at providing better resource isolation guarantees, the *Per-Job* mode uses the available resource provider
framework (e.g. YARN, Kubernetes) to spin up a cluster for each submitted job. This cluster is available to
that job only. When the job finishes, the cluster is torn down and any lingering resources (files, etc) are
cleared up. This provides better resource isolation, as a misbehaving job can only bring down its own
TaskManagers. In addition, it spreads the load of book-keeping across multiple JobManagers, as there is
one per job. For these reasons, the *Per-Job* resource allocation model is the preferred mode by many
production reasons.

#### Session Mode

*Session mode* assumes an already running cluster and uses the resources of that cluster to execute any
Expand All @@ -250,9 +238,7 @@ is responsible for the book-keeping of all the jobs in the cluster.
#### Summary

In *Session Mode*, the cluster lifecycle is independent of that of any job running on the cluster
and the resources are shared across all jobs. The *Per-Job* mode pays the price of spinning up a cluster
for every submitted job, but this comes with better isolation guarantees as the resources are not shared
across jobs. In this case, the lifecycle of the cluster is bound to that of the job. Finally, the
and the resources are shared across all jobs. The
*Application Mode* creates a session cluster per application and executes the application's `main()`
method on the cluster.

Expand Down
Loading

0 comments on commit 7a60c19

Please sign in to comment.