Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Added error handling and default logic for Flink version detection #9452

Merged
merged 7 commits into from
Jan 31, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,44 @@
*/
package org.apache.iceberg.flink.util;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;

public class FlinkPackage {
/** Choose {@link DataStream} class because it is one of the core Flink API. */
private static final String VERSION = DataStream.class.getPackage().getImplementationVersion();

private static final AtomicReference<String> VERSION = new AtomicReference<>();
public static final String FLINK_UNKNOWN_VERSION = "FLINK-UNKNOWN-VERSION";

private FlinkPackage() {}

/** Returns Flink version string like x.y.z */
public static String version() {
return VERSION;
if (null == VERSION.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we move the try-catch/null handling to an initVersion method?

private static final String VERSION = initVersion()
...
private static String initVersion() {
     String detectedVersion = null;
      try {
        detectedVersion = versionFromJar();
        // use unknown version in case exact implementation version can't be found from the jar
        // (this can happen if the DataStream class appears multiple times in the same classpath
        // such as with shading)
        detectedVersion = detectedVersion != null ? detectedVersion : FLINK_UNKNOWN_VERSION;
      } catch (Exception e) {
        detectedVersion = FLINK_UNKNOWN_VERSION;
      }
      return detectedVersion;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu - the cached value cannot be static final String because that will make the code untestable. (static final AtomicReference<String> is fine, because its internal state is still mutable.) If it's not final, it cannot be named VERSION because the checkstyle config requires non-final static variables to be camelCased.

The current implementation reflects @nastra and @pvary 's last suggestions -- @nastra 's suggestion to use AtomicReference and @pvary 's suggestion to consolidate to one function.

At this point we seem to have contradictory suggestions based on slightly different aesthetic preferences since no two people write code exactly the same. I'm fine implementing any of them, but I can't do all of them.

Is the current implementation ready for commit or can we come to a consensus on what needs to be done to get it ready?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure it is really necessary to add a unit test for the trivial change. anyway, this is fine to me. I will leave the style decision to @nastra and @pvary

String detectedVersion = null;
try {
detectedVersion = versionFromJar();
// use unknown version in case exact implementation version can't be found from the jar
// (this can happen if the DataStream class appears multiple times in the same classpath
// such as with shading)
detectedVersion = detectedVersion != null ? detectedVersion : FLINK_UNKNOWN_VERSION;
} catch (Exception e) {
detectedVersion = FLINK_UNKNOWN_VERSION;
}
VERSION.set(detectedVersion);
}

return VERSION.get();
}

@VisibleForTesting
static String versionFromJar() {
// Choose {@link DataStream} class because it is one of the core Flink API
return DataStream.class.getPackage().getImplementationVersion();
}

@VisibleForTesting
static void setVersion(String version) {
VERSION.set(version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.junit.Assert;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

public class TestFlinkPackage {

Expand All @@ -28,4 +30,25 @@ public class TestFlinkPackage {
public void testVersion() {
Assert.assertEquals("1.16.2", FlinkPackage.version());
}

@Test
public void testDefaultVersion() {
// It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked
// fault to test the default logic

// First make sure we're not caching a version result from a previous test
FlinkPackage.setVersion(null);
try (MockedStatic<FlinkPackage> mockedStatic = Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::versionFromJar).thenThrow(RuntimeException.class);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());
}
FlinkPackage.setVersion(null);
try (MockedStatic<FlinkPackage> mockedStatic = Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::versionFromJar).thenReturn(null);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
FlinkPackage.setVersion(null);
Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,44 @@
*/
package org.apache.iceberg.flink.util;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;

public class FlinkPackage {
/** Choose {@link DataStream} class because it is one of the core Flink API. */
private static final String VERSION = DataStream.class.getPackage().getImplementationVersion();

private static final AtomicReference<String> VERSION = new AtomicReference<>();
public static final String FLINK_UNKNOWN_VERSION = "FLINK-UNKNOWN-VERSION";

private FlinkPackage() {}

/** Returns Flink version string like x.y.z */
public static String version() {
return VERSION;
if (null == VERSION.get()) {
String detectedVersion = null;
try {
detectedVersion = versionFromJar();
// use unknown version in case exact implementation version can't be found from the jar
// (this can happen if the DataStream class appears multiple times in the same classpath
// such as with shading)
detectedVersion = detectedVersion != null ? detectedVersion : FLINK_UNKNOWN_VERSION;
} catch (Exception e) {
detectedVersion = FLINK_UNKNOWN_VERSION;
}
VERSION.set(detectedVersion);
}

return VERSION.get();
}

@VisibleForTesting
static String versionFromJar() {
// Choose {@link DataStream} class because it is one of the core Flink API
return DataStream.class.getPackage().getImplementationVersion();
}

@VisibleForTesting
static void setVersion(String version) {
VERSION.set(version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.junit.Assert;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

public class TestFlinkPackage {

Expand All @@ -28,4 +30,25 @@ public class TestFlinkPackage {
public void testVersion() {
Assert.assertEquals("1.17.1", FlinkPackage.version());
}

@Test
public void testDefaultVersion() {
// It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked
// fault to test the default logic

// First make sure we're not caching a version result from a previous test
FlinkPackage.setVersion(null);
try (MockedStatic<FlinkPackage> mockedStatic = Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::versionFromJar).thenThrow(RuntimeException.class);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());
}
FlinkPackage.setVersion(null);
try (MockedStatic<FlinkPackage> mockedStatic = Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::versionFromJar).thenReturn(null);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
FlinkPackage.setVersion(null);
Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,44 @@
*/
package org.apache.iceberg.flink.util;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;

public class FlinkPackage {
/** Choose {@link DataStream} class because it is one of the core Flink API. */
private static final String VERSION = DataStream.class.getPackage().getImplementationVersion();

private static final AtomicReference<String> VERSION = new AtomicReference<>();
public static final String FLINK_UNKNOWN_VERSION = "FLINK-UNKNOWN-VERSION";

private FlinkPackage() {}

/** Returns Flink version string like x.y.z */
public static String version() {
return VERSION;
if (null == VERSION.get()) {
String detectedVersion = null;
try {
detectedVersion = versionFromJar();
// use unknown version in case exact implementation version can't be found from the jar
// (this can happen if the DataStream class appears multiple times in the same classpath
// such as with shading)
detectedVersion = detectedVersion != null ? detectedVersion : FLINK_UNKNOWN_VERSION;
} catch (Exception e) {
detectedVersion = FLINK_UNKNOWN_VERSION;
}
VERSION.set(detectedVersion);
}

return VERSION.get();
}

@VisibleForTesting
static String versionFromJar() {
// Choose {@link DataStream} class because it is one of the core Flink API
return DataStream.class.getPackage().getImplementationVersion();
}

@VisibleForTesting
static void setVersion(String version) {
VERSION.set(version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.junit.Assert;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

public class TestFlinkPackage {

Expand All @@ -28,4 +30,25 @@ public class TestFlinkPackage {
public void testVersion() {
Assert.assertEquals("1.18.0", FlinkPackage.version());
}

@Test
public void testDefaultVersion() {
// It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked
// fault to test the default logic

// First make sure we're not caching a version result from a previous test
FlinkPackage.setVersion(null);
try (MockedStatic<FlinkPackage> mockedStatic = Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::versionFromJar).thenThrow(RuntimeException.class);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());
}
FlinkPackage.setVersion(null);
try (MockedStatic<FlinkPackage> mockedStatic = Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::versionFromJar).thenReturn(null);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
FlinkPackage.setVersion(null);
Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());
}
}
}