Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Added error handling and default logic for Flink version detection #9452

Merged
merged 7 commits into from
Jan 31, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,31 @@
package org.apache.iceberg.flink.util;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;

public class FlinkPackage {
/** Choose {@link DataStream} class because it is one of the core Flink API. */
private static final String VERSION = DataStream.class.getPackage().getImplementationVersion();

public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN";

private FlinkPackage() {}

/** Returns Flink version string like x.y.z */
public static String version() {
return VERSION;
try {
String version = getVersionFromJar();
/* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please use // instead of /* */ for inline comments?

appears multiple times in the same classpath such as with shading), then the best we can do is say it's
unknown
*/
return version != null ? version : FLINK_UNKNOWN_VERSION;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should store the result in a variable rather than re-fetching the version on every call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. this method can be used to initialize the static VERSION variable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra @stevenzwu - Iceberg's checkstyle config forbids naming a non-final static variable an all-caps name; it wants camel-case.

Should I:

  1. name it "version" OR
  2. Name it VERSION and add a SuppressWarning flag OR
  3. Name it VERSION and fix the checkstyle.xml to use the same regex as static constants use

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want to slightly update the implementation to

public class FlinkPackage {

  private static final AtomicReference<String> VERSION = new AtomicReference<>();
  private static final String UNKNOWN_VERSION = "UNKNOWN-VERSION";

  private FlinkPackage() {}

  /** Returns Flink version string like x.y.z */
  public static String version() {
    if (null == VERSION.get()) {
      VERSION.set(initFlinkVersion());
    }

    return VERSION.get();
  }

  private static String initFlinkVersion() {
    try {
      String version = versionFromJar();
      // use unknown version in case exact implementation version can't be found from the jar (this
      // can happen if the DataStream class appears multiple times in the same classpath such as
      // with shading)
      return version != null ? version : UNKNOWN_VERSION;
    } catch (Exception e) {
      return UNKNOWN_VERSION;
    }
  }

  @VisibleForTesting
  static String versionFromJar() {
    // Choose {@link DataStream} class because it is one of the core Flink API
    return DataStream.class.getPackage().getImplementationVersion();
  }
}

Notice that I also removed the get prefix from versionFromJar() as Iceberg typically doesn't use get methods

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this, we might not want to create an extra method for initFlinkVersion, just something like this:

  public static String version() {
    if (null == VERSION.get()) {
      try {
        String jarVersion = versionFromJar();
        // use unknown version in case exact implementation version can't be found from the jar (this
        // can happen if the DataStream class appears multiple times in the same classpath such as
        // with shading)
        VERSION.set(jarVersion != null ? jarVersion : UNKNOWN_VERSION);
      } catch (Exception e) {
        VERSION.set(UNKNOWN_VERSION);
      }
    }

    return VERSION.get();
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this simple way work, where initFlinkVersion implements the try-catch?

private static final String VERSION = initFlinkVersion();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu - making VERSION private static final String makes it untestable, because you can't test the normal and default code paths in the same JVM.

@nastra @pvary - Using an AtomicReference would work, and if that's what you want I'll do it, but isn't that more complexity than this logic requires? private static volatile String works just fine, aside from the checkstyle issue mentioned above -- I need to either tweak the checkstyle settings, or give it a camelCase name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, this is a pretty simple patch and it's been going back and forth for a week. Can we come to a consensus on what exactly the patch needs, so that if those things are done it's ready for commit? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uploaded a 1-method version using AtomicReference similar to what @nastra and @pvary were suggesting.

} catch (Exception e) {
return FLINK_UNKNOWN_VERSION;
}
}

@VisibleForTesting
static String getVersionFromJar() {
/* Choose {@link DataStream} class because it is one of the core Flink API. */
return DataStream.class.getPackage().getImplementationVersion();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.junit.Assert;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

public class TestFlinkPackage {

Expand All @@ -28,4 +30,21 @@ public class TestFlinkPackage {
public void testVersion() {
Assert.assertEquals("1.16.2", FlinkPackage.version());
}

@Test
public void testDefaultVersion() {
// It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked
// fault to test the default logic
try (MockedStatic<FlinkPackage> mockedStatic = Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::getVersionFromJar).thenThrow(RuntimeException.class);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());
}

try (MockedStatic<FlinkPackage> mockedStatic = Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::getVersionFromJar).thenReturn(null);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,31 @@
package org.apache.iceberg.flink.util;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;

public class FlinkPackage {
/** Choose {@link DataStream} class because it is one of the core Flink API. */
private static final String VERSION = DataStream.class.getPackage().getImplementationVersion();

public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN";

private FlinkPackage() {}

/** Returns Flink version string like x.y.z */
public static String version() {
return VERSION;
try {
String version = getVersionFromJar();
/* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class
appears multiple times in the same classpath such as with shading), then the best we can do is say it's
unknown
*/
return version != null ? version : FLINK_UNKNOWN_VERSION;
} catch (Exception e) {
return FLINK_UNKNOWN_VERSION;
}
}

@VisibleForTesting
static String getVersionFromJar() {
/* Choose {@link DataStream} class because it is one of the core Flink API. */
return DataStream.class.getPackage().getImplementationVersion();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.junit.Assert;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

public class TestFlinkPackage {

Expand All @@ -28,4 +30,21 @@ public class TestFlinkPackage {
public void testVersion() {
Assert.assertEquals("1.17.1", FlinkPackage.version());
}

@Test
public void testDefaultVersion() {
// It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked
// fault to test the default logic
try (MockedStatic<FlinkPackage> mockedStatic = Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::getVersionFromJar).thenThrow(RuntimeException.class);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());
}

try (MockedStatic<FlinkPackage> mockedStatic = Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::getVersionFromJar).thenReturn(null);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,31 @@
package org.apache.iceberg.flink.util;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;

public class FlinkPackage {
/** Choose {@link DataStream} class because it is one of the core Flink API. */
private static final String VERSION = DataStream.class.getPackage().getImplementationVersion();

public static final String FLINK_UNKNOWN_VERSION = "Flink-UNKNOWN";

private FlinkPackage() {}

/** Returns Flink version string like x.y.z */
public static String version() {
return VERSION;
try {
String version = getVersionFromJar();
/* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class
appears multiple times in the same classpath such as with shading), then the best we can do is say it's
unknown
*/
return version != null ? version : FLINK_UNKNOWN_VERSION;
} catch (Exception e) {
return FLINK_UNKNOWN_VERSION;
}
}

@VisibleForTesting
static String getVersionFromJar() {
/* Choose {@link DataStream} class because it is one of the core Flink API. */
return DataStream.class.getPackage().getImplementationVersion();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.junit.Assert;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

public class TestFlinkPackage {

Expand All @@ -28,4 +30,21 @@ public class TestFlinkPackage {
public void testVersion() {
Assert.assertEquals("1.18.0", FlinkPackage.version());
}

@Test
public void testDefaultVersion() {
// It's difficult to reproduce a reflection error in a unit test, so we just inject a mocked
// fault to test the default logic
try (MockedStatic<FlinkPackage> mockedStatic = Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::getVersionFromJar).thenThrow(RuntimeException.class);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());
}

try (MockedStatic<FlinkPackage> mockedStatic = Mockito.mockStatic(FlinkPackage.class)) {
mockedStatic.when(FlinkPackage::getVersionFromJar).thenReturn(null);
mockedStatic.when(FlinkPackage::version).thenCallRealMethod();
Assert.assertEquals(FlinkPackage.FLINK_UNKNOWN_VERSION, FlinkPackage.version());
}
}
}