From c153f383a4bd3a44b8eaa3bfcb756e6ef4a332cd Mon Sep 17 00:00:00 2001 From: Geoffrey Jacoby Date: Tue, 9 Jan 2024 15:52:33 -0500 Subject: [PATCH 1/7] Added error handling and default logic for Flink version detection --- .../iceberg/flink/util/FlinkPackage.java | 13 +++--- .../flink/util/FlinkVersionDetector.java | 44 +++++++++++++++++++ .../iceberg/flink/util/TestFlinkPackage.java | 17 +++++++ .../iceberg/flink/util/FlinkPackage.java | 13 +++--- .../flink/util/FlinkVersionDetector.java | 44 +++++++++++++++++++ .../iceberg/flink/util/TestFlinkPackage.java | 17 +++++++ .../iceberg/flink/util/FlinkPackage.java | 13 +++--- .../flink/util/FlinkVersionDetector.java | 44 +++++++++++++++++++ .../iceberg/flink/util/TestFlinkPackage.java | 17 +++++++ 9 files changed, 207 insertions(+), 15 deletions(-) create mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java create mode 100644 flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java create mode 100644 flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java index 00d74d8d345c..e6034ed3a7de 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java @@ -18,16 +18,19 @@ */ package org.apache.iceberg.flink.util; -import org.apache.flink.streaming.api.datastream.DataStream; - public class FlinkPackage { - /** Choose {@link DataStream} class because it is one of the core Flink API. */ - private static final String VERSION = DataStream.class.getPackage().getImplementationVersion(); + + private static FlinkVersionDetector versionDetector = new FlinkVersionDetector(); private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { - return VERSION; + return versionDetector.version(); + } + + static void setVersionDetector(FlinkVersionDetector detector) { + // visible for testing + versionDetector = detector; } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java new file mode 100644 index 000000000000..1531b8a4fecf --- /dev/null +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.util; + +import org.apache.flink.streaming.api.datastream.DataStream; + +public class FlinkVersionDetector { + public String version() { + String version = null; + try { + version = getVersionFromJar(); + } catch (Exception e) { + /* we can't detect the exact implementation version from the jar (this can happen if the DataStream class + appears multiple times in the same classpath such as with shading), so the best we can do is say it's + 1.16.x below + */ + } + if (version == null) { + version = "1.16.x"; + } + return version; + } + + String getVersionFromJar() { + /* Choose {@link DataStream} class because it is one of the core Flink API. */ + return DataStream.class.getPackage().getImplementationVersion(); + } +} diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index 4c9b8aea8259..d80df48758b2 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -18,14 +18,31 @@ */ package org.apache.iceberg.flink.util; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; public class TestFlinkPackage { /** This unit test would need to be adjusted as new Flink version is supported. */ + @AfterClass + public static void cleanup() { + FlinkPackage.setVersionDetector(new FlinkVersionDetector()); + } + @Test public void testVersion() { Assert.assertEquals("1.16.2", FlinkPackage.version()); } + + @Test + public void testDefaultVersion() { + // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked + // fault to test the default logic + FlinkVersionDetector detectorWithReflectionError = Mockito.spy(FlinkVersionDetector.class); + Mockito.when(detectorWithReflectionError.getVersionFromJar()).thenThrow(RuntimeException.class); + FlinkPackage.setVersionDetector(detectorWithReflectionError); + Assert.assertEquals("1.16.x", FlinkPackage.version()); + } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java index 00d74d8d345c..e6034ed3a7de 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java @@ -18,16 +18,19 @@ */ package org.apache.iceberg.flink.util; -import org.apache.flink.streaming.api.datastream.DataStream; - public class FlinkPackage { - /** Choose {@link DataStream} class because it is one of the core Flink API. */ - private static final String VERSION = DataStream.class.getPackage().getImplementationVersion(); + + private static FlinkVersionDetector versionDetector = new FlinkVersionDetector(); private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { - return VERSION; + return versionDetector.version(); + } + + static void setVersionDetector(FlinkVersionDetector detector) { + // visible for testing + versionDetector = detector; } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java new file mode 100644 index 000000000000..5541c8e483c9 --- /dev/null +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.util; + +import org.apache.flink.streaming.api.datastream.DataStream; + +public class FlinkVersionDetector { + public String version() { + String version = null; + try { + version = getVersionFromJar(); + } catch (Exception e) { + /* we can't detect the exact implementation version from the jar (this can happen if the DataStream class + appears multiple times in the same classpath such as with shading), so the best we can do is say it's + 1.17.x below + */ + } + if (version == null) { + version = "1.17.x"; + } + return version; + } + + String getVersionFromJar() { + /* Choose {@link DataStream} class because it is one of the core Flink API. */ + return DataStream.class.getPackage().getImplementationVersion(); + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index cf244f410288..5cd38450c70e 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -18,14 +18,31 @@ */ package org.apache.iceberg.flink.util; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; public class TestFlinkPackage { /** This unit test would need to be adjusted as new Flink version is supported. */ + @AfterClass + public static void cleanup() { + FlinkPackage.setVersionDetector(new FlinkVersionDetector()); + } + @Test public void testVersion() { Assert.assertEquals("1.17.1", FlinkPackage.version()); } + + @Test + public void testDefaultVersion() { + // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked fault to test + // the default logic + FlinkVersionDetector detectorWithReflectionError = Mockito.spy(FlinkVersionDetector.class); + Mockito.when(detectorWithReflectionError.getVersionFromJar()).thenThrow(RuntimeException.class); + FlinkPackage.setVersionDetector(detectorWithReflectionError); + Assert.assertEquals("1.17.x", FlinkPackage.version()); + } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java index 00d74d8d345c..e6034ed3a7de 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java @@ -18,16 +18,19 @@ */ package org.apache.iceberg.flink.util; -import org.apache.flink.streaming.api.datastream.DataStream; - public class FlinkPackage { - /** Choose {@link DataStream} class because it is one of the core Flink API. */ - private static final String VERSION = DataStream.class.getPackage().getImplementationVersion(); + + private static FlinkVersionDetector versionDetector = new FlinkVersionDetector(); private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { - return VERSION; + return versionDetector.version(); + } + + static void setVersionDetector(FlinkVersionDetector detector) { + // visible for testing + versionDetector = detector; } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java new file mode 100644 index 000000000000..22d5671f0f30 --- /dev/null +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.util; + +import org.apache.flink.streaming.api.datastream.DataStream; + +public class FlinkVersionDetector { + public String version() { + String version = null; + try { + version = getVersionFromJar(); + } catch (Exception e) { + /* we can't detect the exact implementation version from the jar (this can happen if the DataStream class + appears multiple times in the same classpath such as with shading), so the best we can do is say it's + 1.18.x below + */ + } + if (version == null) { + version = "1.18.x"; + } + return version; + } + + String getVersionFromJar() { + /* Choose {@link DataStream} class because it is one of the core Flink API. */ + return DataStream.class.getPackage().getImplementationVersion(); + } +} diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index a805d160d809..9e7411ea3b21 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -18,14 +18,31 @@ */ package org.apache.iceberg.flink.util; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; public class TestFlinkPackage { /** This unit test would need to be adjusted as new Flink version is supported. */ + @AfterClass + public static void cleanup() { + FlinkPackage.setVersionDetector(new FlinkVersionDetector()); + } + @Test public void testVersion() { Assert.assertEquals("1.18.0", FlinkPackage.version()); } + + @Test + public void testDefaultVersion() { + // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked fault to test + // the default logic + FlinkVersionDetector detectorWithReflectionError = Mockito.spy(FlinkVersionDetector.class); + Mockito.when(detectorWithReflectionError.getVersionFromJar()).thenThrow(RuntimeException.class); + FlinkPackage.setVersionDetector(detectorWithReflectionError); + Assert.assertEquals("1.18.x", FlinkPackage.version()); + } } From 7428ec2b2c2cddf22930af8f0da0d69cb2a0ba63 Mon Sep 17 00:00:00 2001 From: Geoffrey Jacoby Date: Tue, 9 Jan 2024 16:05:02 -0500 Subject: [PATCH 2/7] Added error handling and default logic for Flink version detection --- .../java/org/apache/iceberg/flink/util/TestFlinkPackage.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index 9e7411ea3b21..e0356c102bc7 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -38,7 +38,8 @@ public void testVersion() { @Test public void testDefaultVersion() { - // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked fault to test + // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked + // fault to test // the default logic FlinkVersionDetector detectorWithReflectionError = Mockito.spy(FlinkVersionDetector.class); Mockito.when(detectorWithReflectionError.getVersionFromJar()).thenThrow(RuntimeException.class); From 4384e941cd3af0a502e37f7da0eb35637652f0eb Mon Sep 17 00:00:00 2001 From: Geoffrey Jacoby Date: Tue, 9 Jan 2024 16:09:34 -0500 Subject: [PATCH 3/7] Added error handling and default logic for Flink version detection --- .../java/org/apache/iceberg/flink/util/TestFlinkPackage.java | 4 ++-- .../java/org/apache/iceberg/flink/util/TestFlinkPackage.java | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index 5cd38450c70e..5410e37dc8a9 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -38,8 +38,8 @@ public void testVersion() { @Test public void testDefaultVersion() { - // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked fault to test - // the default logic + // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked + // fault to test the default logic FlinkVersionDetector detectorWithReflectionError = Mockito.spy(FlinkVersionDetector.class); Mockito.when(detectorWithReflectionError.getVersionFromJar()).thenThrow(RuntimeException.class); FlinkPackage.setVersionDetector(detectorWithReflectionError); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index e0356c102bc7..f0fe50503c3d 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -39,8 +39,7 @@ public void testVersion() { @Test public void testDefaultVersion() { // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked - // fault to test - // the default logic + // fault to test the default logic FlinkVersionDetector detectorWithReflectionError = Mockito.spy(FlinkVersionDetector.class); Mockito.when(detectorWithReflectionError.getVersionFromJar()).thenThrow(RuntimeException.class); FlinkPackage.setVersionDetector(detectorWithReflectionError); From adadc1944585e11680b44eaca02f03d5cc994886 Mon Sep 17 00:00:00 2001 From: Geoffrey Jacoby Date: Thu, 11 Jan 2024 14:24:08 -0500 Subject: [PATCH 4/7] Added error handling and default logic for Flink version detection --- .../iceberg/flink/util/FlinkPackage.java | 24 +++++++--- .../flink/util/FlinkVersionDetector.java | 44 ------------------- .../iceberg/flink/util/TestFlinkPackage.java | 22 +++++----- .../iceberg/flink/util/FlinkPackage.java | 24 +++++++--- .../flink/util/FlinkVersionDetector.java | 44 ------------------- .../iceberg/flink/util/TestFlinkPackage.java | 22 +++++----- .../iceberg/flink/util/FlinkPackage.java | 24 +++++++--- .../flink/util/FlinkVersionDetector.java | 44 ------------------- .../iceberg/flink/util/TestFlinkPackage.java | 22 +++++----- 9 files changed, 93 insertions(+), 177 deletions(-) delete mode 100644 flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java delete mode 100644 flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java delete mode 100644 flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java index e6034ed3a7de..bc11613ef684 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java @@ -18,19 +18,33 @@ */ package org.apache.iceberg.flink.util; +import org.apache.flink.streaming.api.datastream.DataStream; + public class FlinkPackage { - private static FlinkVersionDetector versionDetector = new FlinkVersionDetector(); + public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN"; private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { - return versionDetector.version(); + String version = null; + try { + version = getVersionFromJar(); + } catch (Exception e) { + /* we can't detect the exact implementation version from the jar (this can happen if the DataStream class + appears multiple times in the same classpath such as with shading), so the best we can do is say it's + unknown + */ + } + if (version == null) { + version = FLINK_UNKNOWN_VERSION; + } + return version; } - static void setVersionDetector(FlinkVersionDetector detector) { - // visible for testing - versionDetector = detector; + static String getVersionFromJar() { + /* Choose {@link DataStream} class because it is one of the core Flink API. */ + return DataStream.class.getPackage().getImplementationVersion(); } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java deleted file mode 100644 index 1531b8a4fecf..000000000000 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.util; - -import org.apache.flink.streaming.api.datastream.DataStream; - -public class FlinkVersionDetector { - public String version() { - String version = null; - try { - version = getVersionFromJar(); - } catch (Exception e) { - /* we can't detect the exact implementation version from the jar (this can happen if the DataStream class - appears multiple times in the same classpath such as with shading), so the best we can do is say it's - 1.16.x below - */ - } - if (version == null) { - version = "1.16.x"; - } - return version; - } - - String getVersionFromJar() { - /* Choose {@link DataStream} class because it is one of the core Flink API. */ - return DataStream.class.getPackage().getImplementationVersion(); - } -} diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index d80df48758b2..6ffb9e44e3e1 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -18,19 +18,14 @@ */ package org.apache.iceberg.flink.util; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; +import org.mockito.MockedStatic; import org.mockito.Mockito; public class TestFlinkPackage { /** This unit test would need to be adjusted as new Flink version is supported. */ - @AfterClass - public static void cleanup() { - FlinkPackage.setVersionDetector(new FlinkVersionDetector()); - } - @Test public void testVersion() { Assert.assertEquals("1.16.2", FlinkPackage.version()); @@ -40,9 +35,16 @@ public void testVersion() { public void testDefaultVersion() { // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked // fault to test the default logic - FlinkVersionDetector detectorWithReflectionError = Mockito.spy(FlinkVersionDetector.class); - Mockito.when(detectorWithReflectionError.getVersionFromJar()).thenThrow(RuntimeException.class); - FlinkPackage.setVersionDetector(detectorWithReflectionError); - Assert.assertEquals("1.16.x", FlinkPackage.version()); + try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { + mockedStatic.when(FlinkPackage::getVersionFromJar).thenThrow(RuntimeException.class); + mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); + Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); + } + + try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { + mockedStatic.when(FlinkPackage::getVersionFromJar).thenReturn(null); + mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); + Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); + } } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java index e6034ed3a7de..bc11613ef684 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java @@ -18,19 +18,33 @@ */ package org.apache.iceberg.flink.util; +import org.apache.flink.streaming.api.datastream.DataStream; + public class FlinkPackage { - private static FlinkVersionDetector versionDetector = new FlinkVersionDetector(); + public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN"; private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { - return versionDetector.version(); + String version = null; + try { + version = getVersionFromJar(); + } catch (Exception e) { + /* we can't detect the exact implementation version from the jar (this can happen if the DataStream class + appears multiple times in the same classpath such as with shading), so the best we can do is say it's + unknown + */ + } + if (version == null) { + version = FLINK_UNKNOWN_VERSION; + } + return version; } - static void setVersionDetector(FlinkVersionDetector detector) { - // visible for testing - versionDetector = detector; + static String getVersionFromJar() { + /* Choose {@link DataStream} class because it is one of the core Flink API. */ + return DataStream.class.getPackage().getImplementationVersion(); } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java deleted file mode 100644 index 5541c8e483c9..000000000000 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.util; - -import org.apache.flink.streaming.api.datastream.DataStream; - -public class FlinkVersionDetector { - public String version() { - String version = null; - try { - version = getVersionFromJar(); - } catch (Exception e) { - /* we can't detect the exact implementation version from the jar (this can happen if the DataStream class - appears multiple times in the same classpath such as with shading), so the best we can do is say it's - 1.17.x below - */ - } - if (version == null) { - version = "1.17.x"; - } - return version; - } - - String getVersionFromJar() { - /* Choose {@link DataStream} class because it is one of the core Flink API. */ - return DataStream.class.getPackage().getImplementationVersion(); - } -} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index 5410e37dc8a9..94a889011036 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -18,19 +18,14 @@ */ package org.apache.iceberg.flink.util; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; +import org.mockito.MockedStatic; import org.mockito.Mockito; public class TestFlinkPackage { /** This unit test would need to be adjusted as new Flink version is supported. */ - @AfterClass - public static void cleanup() { - FlinkPackage.setVersionDetector(new FlinkVersionDetector()); - } - @Test public void testVersion() { Assert.assertEquals("1.17.1", FlinkPackage.version()); @@ -40,9 +35,16 @@ public void testVersion() { public void testDefaultVersion() { // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked // fault to test the default logic - FlinkVersionDetector detectorWithReflectionError = Mockito.spy(FlinkVersionDetector.class); - Mockito.when(detectorWithReflectionError.getVersionFromJar()).thenThrow(RuntimeException.class); - FlinkPackage.setVersionDetector(detectorWithReflectionError); - Assert.assertEquals("1.17.x", FlinkPackage.version()); + try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { + mockedStatic.when(FlinkPackage::getVersionFromJar).thenThrow(RuntimeException.class); + mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); + Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); + } + + try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { + mockedStatic.when(FlinkPackage::getVersionFromJar).thenReturn(null); + mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); + Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); + } } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java index e6034ed3a7de..bc11613ef684 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java @@ -18,19 +18,33 @@ */ package org.apache.iceberg.flink.util; +import org.apache.flink.streaming.api.datastream.DataStream; + public class FlinkPackage { - private static FlinkVersionDetector versionDetector = new FlinkVersionDetector(); + public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN"; private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { - return versionDetector.version(); + String version = null; + try { + version = getVersionFromJar(); + } catch (Exception e) { + /* we can't detect the exact implementation version from the jar (this can happen if the DataStream class + appears multiple times in the same classpath such as with shading), so the best we can do is say it's + unknown + */ + } + if (version == null) { + version = FLINK_UNKNOWN_VERSION; + } + return version; } - static void setVersionDetector(FlinkVersionDetector detector) { - // visible for testing - versionDetector = detector; + static String getVersionFromJar() { + /* Choose {@link DataStream} class because it is one of the core Flink API. */ + return DataStream.class.getPackage().getImplementationVersion(); } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java deleted file mode 100644 index 22d5671f0f30..000000000000 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkVersionDetector.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.flink.util; - -import org.apache.flink.streaming.api.datastream.DataStream; - -public class FlinkVersionDetector { - public String version() { - String version = null; - try { - version = getVersionFromJar(); - } catch (Exception e) { - /* we can't detect the exact implementation version from the jar (this can happen if the DataStream class - appears multiple times in the same classpath such as with shading), so the best we can do is say it's - 1.18.x below - */ - } - if (version == null) { - version = "1.18.x"; - } - return version; - } - - String getVersionFromJar() { - /* Choose {@link DataStream} class because it is one of the core Flink API. */ - return DataStream.class.getPackage().getImplementationVersion(); - } -} diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index f0fe50503c3d..540f6c8bb94d 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -18,19 +18,14 @@ */ package org.apache.iceberg.flink.util; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; +import org.mockito.MockedStatic; import org.mockito.Mockito; public class TestFlinkPackage { /** This unit test would need to be adjusted as new Flink version is supported. */ - @AfterClass - public static void cleanup() { - FlinkPackage.setVersionDetector(new FlinkVersionDetector()); - } - @Test public void testVersion() { Assert.assertEquals("1.18.0", FlinkPackage.version()); @@ -40,9 +35,16 @@ public void testVersion() { public void testDefaultVersion() { // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked // fault to test the default logic - FlinkVersionDetector detectorWithReflectionError = Mockito.spy(FlinkVersionDetector.class); - Mockito.when(detectorWithReflectionError.getVersionFromJar()).thenThrow(RuntimeException.class); - FlinkPackage.setVersionDetector(detectorWithReflectionError); - Assert.assertEquals("1.18.x", FlinkPackage.version()); + try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { + mockedStatic.when(FlinkPackage::getVersionFromJar).thenThrow(RuntimeException.class); + mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); + Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); + } + + try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { + mockedStatic.when(FlinkPackage::getVersionFromJar).thenReturn(null); + mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); + Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); + } } } From bb8c50d59d60f222e76525b952557255491c37c0 Mon Sep 17 00:00:00 2001 From: Geoffrey Jacoby Date: Fri, 12 Jan 2024 12:11:42 -0500 Subject: [PATCH 5/7] Added error handling and default logic for Flink version detection --- .../apache/iceberg/flink/util/FlinkPackage.java | 17 ++++++++--------- .../apache/iceberg/flink/util/FlinkPackage.java | 17 ++++++++--------- .../apache/iceberg/flink/util/FlinkPackage.java | 17 ++++++++--------- 3 files changed, 24 insertions(+), 27 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java index bc11613ef684..b4a37127ef34 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.util; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; public class FlinkPackage { @@ -28,21 +29,19 @@ private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { - String version = null; try { - version = getVersionFromJar(); - } catch (Exception e) { - /* we can't detect the exact implementation version from the jar (this can happen if the DataStream class - appears multiple times in the same classpath such as with shading), so the best we can do is say it's + String version = getVersionFromJar(); + /* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class + appears multiple times in the same classpath such as with shading), then the best we can do is say it's unknown */ + return version != null ? version : FLINK_UNKNOWN_VERSION; + } catch (Exception e) { + return FLINK_UNKNOWN_VERSION; } - if (version == null) { - version = FLINK_UNKNOWN_VERSION; - } - return version; } + @VisibleForTesting static String getVersionFromJar() { /* Choose {@link DataStream} class because it is one of the core Flink API. */ return DataStream.class.getPackage().getImplementationVersion(); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java index bc11613ef684..b4a37127ef34 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.util; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; public class FlinkPackage { @@ -28,21 +29,19 @@ private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { - String version = null; try { - version = getVersionFromJar(); - } catch (Exception e) { - /* we can't detect the exact implementation version from the jar (this can happen if the DataStream class - appears multiple times in the same classpath such as with shading), so the best we can do is say it's + String version = getVersionFromJar(); + /* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class + appears multiple times in the same classpath such as with shading), then the best we can do is say it's unknown */ + return version != null ? version : FLINK_UNKNOWN_VERSION; + } catch (Exception e) { + return FLINK_UNKNOWN_VERSION; } - if (version == null) { - version = FLINK_UNKNOWN_VERSION; - } - return version; } + @VisibleForTesting static String getVersionFromJar() { /* Choose {@link DataStream} class because it is one of the core Flink API. */ return DataStream.class.getPackage().getImplementationVersion(); diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java index bc11613ef684..b4a37127ef34 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.util; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; public class FlinkPackage { @@ -28,21 +29,19 @@ private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { - String version = null; try { - version = getVersionFromJar(); - } catch (Exception e) { - /* we can't detect the exact implementation version from the jar (this can happen if the DataStream class - appears multiple times in the same classpath such as with shading), so the best we can do is say it's + String version = getVersionFromJar(); + /* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class + appears multiple times in the same classpath such as with shading), then the best we can do is say it's unknown */ + return version != null ? version : FLINK_UNKNOWN_VERSION; + } catch (Exception e) { + return FLINK_UNKNOWN_VERSION; } - if (version == null) { - version = FLINK_UNKNOWN_VERSION; - } - return version; } + @VisibleForTesting static String getVersionFromJar() { /* Choose {@link DataStream} class because it is one of the core Flink API. */ return DataStream.class.getPackage().getImplementationVersion(); From 428f367a7acf6db5d96c2609b29895597883c628 Mon Sep 17 00:00:00 2001 From: Geoffrey Jacoby Date: Wed, 17 Jan 2024 16:28:47 -0500 Subject: [PATCH 6/7] Added error handling and default logic for Flink version detection --- .../iceberg/flink/util/FlinkPackage.java | 36 ++++++++++++------- .../iceberg/flink/util/TestFlinkPackage.java | 10 ++++-- .../iceberg/flink/util/FlinkPackage.java | 36 ++++++++++++------- .../iceberg/flink/util/TestFlinkPackage.java | 6 +++- .../iceberg/flink/util/FlinkPackage.java | 36 ++++++++++++------- .../iceberg/flink/util/TestFlinkPackage.java | 6 +++- 6 files changed, 89 insertions(+), 41 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java index b4a37127ef34..353cee56bebb 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java @@ -18,32 +18,44 @@ */ package org.apache.iceberg.flink.util; +import java.util.concurrent.atomic.AtomicReference; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; public class FlinkPackage { - public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN"; + private static final AtomicReference VERSION = new AtomicReference<>(); + public static final String FLINK_UNKNOWN_VERSION = "FLINK-UNKNOWN-VERSION"; private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { - try { - String version = getVersionFromJar(); - /* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class - appears multiple times in the same classpath such as with shading), then the best we can do is say it's - unknown - */ - return version != null ? version : FLINK_UNKNOWN_VERSION; - } catch (Exception e) { - return FLINK_UNKNOWN_VERSION; + if (null == VERSION.get()) { + String detectedVersion = null; + try { + detectedVersion = versionFromJar(); + // use unknown version in case exact implementation version can't be found from the jar + // (this can happen if the DataStream class appears multiple times in the same classpath + // such as with shading) + detectedVersion = detectedVersion != null ? detectedVersion : FLINK_UNKNOWN_VERSION; + } catch (Exception e) { + detectedVersion = FLINK_UNKNOWN_VERSION; + } + VERSION.set(detectedVersion); } + + return VERSION.get(); } @VisibleForTesting - static String getVersionFromJar() { - /* Choose {@link DataStream} class because it is one of the core Flink API. */ + static String versionFromJar() { + // Choose {@link DataStream} class because it is one of the core Flink API return DataStream.class.getPackage().getImplementationVersion(); } + + @VisibleForTesting + static void setVersion(String version) { + VERSION.set(version); + } } diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index 6ffb9e44e3e1..23cefa834757 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -35,15 +35,19 @@ public void testVersion() { public void testDefaultVersion() { // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked // fault to test the default logic + + // First make sure we're not caching a version result from a previous test + FlinkPackage.setVersion(null); try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { - mockedStatic.when(FlinkPackage::getVersionFromJar).thenThrow(RuntimeException.class); + mockedStatic.when(FlinkPackage::versionFromJar).thenThrow(RuntimeException.class); mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); } - + FlinkPackage.setVersion(null); try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { - mockedStatic.when(FlinkPackage::getVersionFromJar).thenReturn(null); + mockedStatic.when(FlinkPackage::versionFromJar).thenReturn(null); mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); + FlinkPackage.setVersion(null); Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java index b4a37127ef34..353cee56bebb 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java @@ -18,32 +18,44 @@ */ package org.apache.iceberg.flink.util; +import java.util.concurrent.atomic.AtomicReference; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; public class FlinkPackage { - public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN"; + private static final AtomicReference VERSION = new AtomicReference<>(); + public static final String FLINK_UNKNOWN_VERSION = "FLINK-UNKNOWN-VERSION"; private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { - try { - String version = getVersionFromJar(); - /* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class - appears multiple times in the same classpath such as with shading), then the best we can do is say it's - unknown - */ - return version != null ? version : FLINK_UNKNOWN_VERSION; - } catch (Exception e) { - return FLINK_UNKNOWN_VERSION; + if (null == VERSION.get()) { + String detectedVersion = null; + try { + detectedVersion = versionFromJar(); + // use unknown version in case exact implementation version can't be found from the jar + // (this can happen if the DataStream class appears multiple times in the same classpath + // such as with shading) + detectedVersion = detectedVersion != null ? detectedVersion : FLINK_UNKNOWN_VERSION; + } catch (Exception e) { + detectedVersion = FLINK_UNKNOWN_VERSION; + } + VERSION.set(detectedVersion); } + + return VERSION.get(); } @VisibleForTesting - static String getVersionFromJar() { - /* Choose {@link DataStream} class because it is one of the core Flink API. */ + static String versionFromJar() { + // Choose {@link DataStream} class because it is one of the core Flink API return DataStream.class.getPackage().getImplementationVersion(); } + + @VisibleForTesting + static void setVersion(String version) { + VERSION.set(version); + } } diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index 94a889011036..99b2f56f2ae6 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -35,15 +35,19 @@ public void testVersion() { public void testDefaultVersion() { // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked // fault to test the default logic + + // First make sure we're not caching a version result from a previous test + FlinkPackage.setVersion(null); try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { mockedStatic.when(FlinkPackage::getVersionFromJar).thenThrow(RuntimeException.class); mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); } - + FlinkPackage.setVersion(null); try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { mockedStatic.when(FlinkPackage::getVersionFromJar).thenReturn(null); mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); + FlinkPackage.setVersion(null); Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java index b4a37127ef34..353cee56bebb 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/FlinkPackage.java @@ -18,32 +18,44 @@ */ package org.apache.iceberg.flink.util; +import java.util.concurrent.atomic.AtomicReference; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; public class FlinkPackage { - public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN"; + private static final AtomicReference VERSION = new AtomicReference<>(); + public static final String FLINK_UNKNOWN_VERSION = "FLINK-UNKNOWN-VERSION"; private FlinkPackage() {} /** Returns Flink version string like x.y.z */ public static String version() { - try { - String version = getVersionFromJar(); - /* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class - appears multiple times in the same classpath such as with shading), then the best we can do is say it's - unknown - */ - return version != null ? version : FLINK_UNKNOWN_VERSION; - } catch (Exception e) { - return FLINK_UNKNOWN_VERSION; + if (null == VERSION.get()) { + String detectedVersion = null; + try { + detectedVersion = versionFromJar(); + // use unknown version in case exact implementation version can't be found from the jar + // (this can happen if the DataStream class appears multiple times in the same classpath + // such as with shading) + detectedVersion = detectedVersion != null ? detectedVersion : FLINK_UNKNOWN_VERSION; + } catch (Exception e) { + detectedVersion = FLINK_UNKNOWN_VERSION; + } + VERSION.set(detectedVersion); } + + return VERSION.get(); } @VisibleForTesting - static String getVersionFromJar() { - /* Choose {@link DataStream} class because it is one of the core Flink API. */ + static String versionFromJar() { + // Choose {@link DataStream} class because it is one of the core Flink API return DataStream.class.getPackage().getImplementationVersion(); } + + @VisibleForTesting + static void setVersion(String version) { + VERSION.set(version); + } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index 540f6c8bb94d..977599c2132a 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -35,15 +35,19 @@ public void testVersion() { public void testDefaultVersion() { // It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked // fault to test the default logic + + // First make sure we're not caching a version result from a previous test + FlinkPackage.setVersion(null); try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { mockedStatic.when(FlinkPackage::getVersionFromJar).thenThrow(RuntimeException.class); mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); } - + FlinkPackage.setVersion(null); try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { mockedStatic.when(FlinkPackage::getVersionFromJar).thenReturn(null); mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); + FlinkPackage.setVersion(null); Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); } } From be5d3aa24f4144f43cd98a273adc4b1a907270c4 Mon Sep 17 00:00:00 2001 From: Geoffrey Jacoby Date: Wed, 17 Jan 2024 16:32:57 -0500 Subject: [PATCH 7/7] Added error handling and default logic for Flink version detection --- .../java/org/apache/iceberg/flink/util/TestFlinkPackage.java | 4 ++-- .../java/org/apache/iceberg/flink/util/TestFlinkPackage.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index 99b2f56f2ae6..ceb3ed558698 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -39,13 +39,13 @@ public void testDefaultVersion() { // First make sure we're not caching a version result from a previous test FlinkPackage.setVersion(null); try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { - mockedStatic.when(FlinkPackage::getVersionFromJar).thenThrow(RuntimeException.class); + mockedStatic.when(FlinkPackage::versionFromJar).thenThrow(RuntimeException.class); mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); } FlinkPackage.setVersion(null); try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { - mockedStatic.when(FlinkPackage::getVersionFromJar).thenReturn(null); + mockedStatic.when(FlinkPackage::versionFromJar).thenReturn(null); mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); FlinkPackage.setVersion(null); Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index 977599c2132a..3c1fc68ac5f3 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -39,13 +39,13 @@ public void testDefaultVersion() { // First make sure we're not caching a version result from a previous test FlinkPackage.setVersion(null); try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { - mockedStatic.when(FlinkPackage::getVersionFromJar).thenThrow(RuntimeException.class); + mockedStatic.when(FlinkPackage::versionFromJar).thenThrow(RuntimeException.class); mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version()); } FlinkPackage.setVersion(null); try (MockedStatic mockedStatic = Mockito.mockStatic(FlinkPackage.class)) { - mockedStatic.when(FlinkPackage::getVersionFromJar).thenReturn(null); + mockedStatic.when(FlinkPackage::versionFromJar).thenReturn(null); mockedStatic.when(FlinkPackage::version).thenCallRealMethod(); FlinkPackage.setVersion(null); Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());