Skip to content

Commit

Permalink
[FLINK-35282][python] Upgrade pyarrow and beam
Browse files Browse the repository at this point in the history
  • Loading branch information
Antonio Vespoli authored and hlteoh37 committed Jun 4, 2024
1 parent e4fa72d commit 91a9e06
Show file tree
Hide file tree
Showing 33 changed files with 98 additions and 98 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/deployment/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ related options. Here's an overview of all the Python related options for the ac
<td>
Specify the path of the python interpreter used to execute the python UDF worker
(e.g.: --pyExecutable /usr/local/bin/python3).
The python UDF worker depends on Python 3.8+, Apache Beam (version == 2.43.0),
The python UDF worker depends on Python 3.8+, Apache Beam (version >= 2.52.0),
Pip (version >= 20.3) and SetupTools (version >= 37.0.0).
Please ensure that the specified environment meets the above requirements.
</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ Mode "embedded" (default) submits Flink jobs from the local machine.
/usr/local/bin/python3). The
python UDF worker depends on
Python 3.8+, Apache Beam
(version == 2.43.0), Pip
(version >= 2.52.0), Pip
(version >= 20.3) and SetupTools
(version >= 37.0.0). Please
ensure that the specified
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/deployment/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ related options. Here's an overview of all the Python related options for the ac
<td>
Specify the path of the python interpreter used to execute the python UDF worker
(e.g.: --pyExecutable /usr/local/bin/python3).
The python UDF worker depends on Python 3.8+, Apache Beam (version == 2.43.0),
The python UDF worker depends on Python 3.8+, Apache Beam (version >= 2.52.0),
Pip (version >= 20.3) and SetupTools (version >= 37.0.0).
Please ensure that the specified environment meets the above requirements.
</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/dev/table/sqlClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ Mode "embedded" (default) submits Flink jobs from the local machine.
/usr/local/bin/python3). The
python UDF worker depends on
Python 3.8+, Apache Beam
(version == 2.43.0), Pip
(version >= 2.52.0), Pip
(version >= 20.3) and SetupTools
(version >= 37.0.0). Please
ensure that the specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<td><h5>python.executable</h5></td>
<td style="word-wrap: break-word;">"python"</td>
<td>String</td>
<td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam (version == 2.43.0), Pip (version &gt;= 20.3) and SetupTools (version &gt;= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
<td>Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam (version &gt;= 2.52.0), Pip (version &gt;= 20.3) and SetupTools (version &gt;= 37.0.0). Please ensure that the specified environment meets the above requirements. The option is equivalent to the command line option "-pyexec".</td>
</tr>
<tr>
<td><h5>python.execution-mode</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public class CliFrontendParser {
true,
"Specify the path of the python interpreter used to execute the python UDF worker "
+ "(e.g.: --pyExecutable /usr/local/bin/python3). "
+ "The python UDF worker depends on Python 3.8+, Apache Beam (version == 2.43.0), "
+ "The python UDF worker depends on Python 3.8+, Apache Beam (version >= 2.52.0), "
+ "Pip (version >= 20.3) and SetupTools (version >= 37.0.0). "
+ "Please ensure that the specified environment meets the above requirements.");

Expand Down
2 changes: 1 addition & 1 deletion flink-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The auto-generated Python docs can be found at [https://nightlies.apache.org/fli

## Python Requirements

Apache Flink Python API depends on Py4J (currently version 0.10.9.7), CloudPickle (currently version 2.2.0), python-dateutil (currently version >=2.8.0,<3), Apache Beam (currently version >=2.43.0,<2.49.0).
Apache Flink Python API depends on Py4J (currently version 0.10.9.7), CloudPickle (currently version 2.2.0), python-dateutil (currently version >=2.8.0,<3), Apache Beam (currently version >=2.52.0).

## Development Notices

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ def set_python_executable(self, python_exec: str):
.. note::
The python udf worker depends on Apache Beam (version == 2.43.0).
The python udf worker depends on Apache Beam (version >= 2.52.0).
Please ensure that the specified environment meets the above requirements.
:param python_exec: The path of python interpreter.
Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/table/table_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def set_python_executable(self, python_exec: str):
.. note::
The python udf worker depends on Apache Beam (version == 2.43.0).
The python udf worker depends on Apache Beam (version >= 2.52.0).
Please ensure that the specified environment meets the above requirements.
:param python_exec: The path of python interpreter.
Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ requires = [
"packaging>=20.5; platform_machine=='arm64'", # macos M1
"setuptools>=18.0",
"wheel",
"apache-beam>=2.43.0,<2.49.0",
"apache-beam>=2.52.0",
"cython>=0.29.24",
"fastavro>=1.1.0,!=1.8.0"
]
Expand Down
4 changes: 2 additions & 2 deletions flink-python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,13 @@ def extracted_output_files(base_dir, file_path, output_directory):
'pyflink.bin': ['*']}

install_requires = ['py4j==0.10.9.7', 'python-dateutil>=2.8.0,<3',
'apache-beam>=2.43.0,<2.49.0',
'apache-beam>=2.52.0',
'cloudpickle>=2.2.0', 'avro-python3>=1.8.1,!=1.9.2',
'pytz>=2018.3', 'fastavro>=1.1.0,!=1.8.0', 'requests>=2.26.0',
'protobuf>=3.19.0',
'numpy>=1.22.4',
'pandas>=1.3.0',
'pyarrow>=5.0.0',
'pyarrow>=14.0.1',
'pemja==0.4.1;platform_system != "Windows"',
'httplib2>=0.19.0',
'ruamel.yaml>=0.18.4',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.NoopLock;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
import org.apache.beam.sdk.fn.server.FnService;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables.getStackTraceAsString;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables.getStackTraceAsString;

/** An implementation of the Beam Fn State service. */
public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@

import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ServerInterceptors;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.netty.NettyServerBuilder;
import org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.epoll.EpollEventLoopGroup;
import org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.epoll.EpollServerDomainSocketChannel;
import org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.epoll.EpollServerSocketChannel;
import org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.unix.DomainSocketAddress;
import org.apache.beam.vendor.grpc.v1p48p1.io.netty.util.internal.ThreadLocalRandom;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ServerInterceptors;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.netty.NettyServerBuilder;
import org.apache.beam.vendor.grpc.v1p54p0.io.netty.channel.epoll.EpollEventLoopGroup;
import org.apache.beam.vendor.grpc.v1p54p0.io.netty.channel.epoll.EpollServerDomainSocketChannel;
import org.apache.beam.vendor.grpc.v1p54p0.io.netty.channel.epoll.EpollServerSocketChannel;
import org.apache.beam.vendor.grpc.v1p54p0.io.netty.channel.unix.DomainSocketAddress;
import org.apache.beam.vendor.grpc.v1p54p0.io.netty.util.internal.ThreadLocalRandom;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;

import java.io.File;
import java.io.IOException;
Expand All @@ -40,7 +40,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

// This class is copied from Beam's org.apache.beam.sdk.fn.server.ServerFactory,
// can be removed after https://github.com/apache/beam/issues/21598 is fixed.
Expand Down Expand Up @@ -161,7 +161,7 @@ private static Server createServer(List<BindableService> services, InetSocketAdd
// Set the message size to max value here. The actual size is governed
// by the
// buffer size in the layers above.
.maxMessageSize(Integer.MAX_VALUE)
.maxInboundMessageSize(Integer.MAX_VALUE)
.permitKeepAliveTime(KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS);
services.stream()
.forEach(
Expand Down Expand Up @@ -223,7 +223,7 @@ private static Server createServer(
.channelType(EpollServerDomainSocketChannel.class)
.workerEventLoopGroup(new EpollEventLoopGroup())
.bossEventLoopGroup(new EpollEventLoopGroup())
.maxMessageSize(Integer.MAX_VALUE)
.maxInboundMessageSize(Integer.MAX_VALUE)
.permitKeepAliveTime(KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS);
for (BindableService service : services) {
// Wrap the service to extract headers
Expand Down Expand Up @@ -276,7 +276,7 @@ private static Server createServer(List<BindableService> services, InetSocketAdd
.channelType(EpollServerSocketChannel.class)
.workerEventLoopGroup(new EpollEventLoopGroup())
.bossEventLoopGroup(new EpollEventLoopGroup())
.maxMessageSize(Integer.MAX_VALUE)
.maxInboundMessageSize(Integer.MAX_VALUE)
.permitKeepAliveTime(KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS);
for (BindableService service : services) {
// Wrap the service to extract headers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

package org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal;
package org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal;

import org.apache.beam.vendor.grpc.v1p48p1.com.google.common.base.Preconditions;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.common.base.Preconditions;

import javax.annotation.concurrent.ThreadSafe;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public class PythonOptions {
.withDescription(
"Specify the path of the python interpreter used to execute the python "
+ "UDF worker. The python UDF worker depends on Python 3.8+, Apache Beam "
+ "(version == 2.43.0), Pip (version >= 20.3) and SetupTools (version >= 37.0.0). "
+ "(version >= 2.52.0), Pip (version >= 20.3) and SetupTools (version >= 37.0.0). "
+ "Please ensure that the specified environment meets the above requirements. The "
+ "option is equivalent to the command line option \"-pyexec\".");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.python.metric.process;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Counter;
Expand All @@ -43,7 +44,6 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static RunnerApi.Coder createCoderProto(
RunnerApi.FunctionSpec.newBuilder()
.setUrn(FLINK_CODER_URN)
.setPayload(
org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf
org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf
.ByteString.copyFrom(
coderInfoDescriptor.toByteArray()))
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private RunnerApi.ParDoPayload createRevisePayload() {
RunnerApi.FunctionSpec.newBuilder()
.setUrn(STATELESS_FUNCTION_URN)
.setPayload(
org.apache.beam.vendor.grpc.v1p48p1.com.google
org.apache.beam.vendor.grpc.v1p54p0.com.google
.protobuf.ByteString.copyFrom(
proto.toByteArray()))
.build());
Expand All @@ -196,7 +196,7 @@ private RunnerApi.ParDoPayload createUdfPayload(
RunnerApi.FunctionSpec.newBuilder()
.setUrn(urn)
.setPayload(
org.apache.beam.vendor.grpc.v1p48p1.com.google
org.apache.beam.vendor.grpc.v1p54p0.com.google
.protobuf.ByteString.copyFrom(
proto.toByteArray()))
.build());
Expand Down
Loading

0 comments on commit 91a9e06

Please sign in to comment.