Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Added error handling and default logic for Flink version detection #9452

Merged
merged 7 commits into from
Jan 31, 2024

Conversation

gjacoby126
Copy link
Contributor

When FlinkEnvironmentContext initializes, it tries to calculate the exact version of Flink it uses via reflection. This can fail in certain conditions, potentially crashing the Flink pipeline. This patch will catch any failures and give a default based on the known Flink minor version.

Closes #7879

@github-actions github-actions bot added the flink label Jan 9, 2024
@gjacoby126 gjacoby126 changed the title Added error handling and default logic for Flink version detection Flink: Added error handling and default logic for Flink version detection Jan 9, 2024
try {
version = getVersionFromJar();
} catch (Exception e) {
/* we can't detect the exact implementation version from the jar (this can happen if the DataStream class
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it a packaging issue, if we have multiple org.apache.flink.streaming.api.datastream.DataStream classes on the classpath?

In this case the load order of these classes might depend on timining, and could cause issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary - Yes, it's a packaging issue. In my use case that reproduced the problem, the two instances of DataStream on the classpath are actually identical, due to unrelocated shading. That normally would cause no issues aside from build warnings. Of course we're fixing that too.

Whether the packaging problem causes other bugs or not though, Iceberg's reaction shouldn't be "throw an NPE and crash the Flink pipeline", which is what currently happens and this PR fixes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, and just fyi, I'm not affiliated with the engineers who posted the original issue this patch fixes; I just came across it while I was investigating. So this has happened at least twice in real-world use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we get the NullPointerException in this case?
By my experience if we have the class on the classpath multiple times, Java just uses the first one it founds. Was it because the shading messed up with the metadata of the jar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know for sure, but the shading messing up the metadata is my best guess too.

In addition to the unit test in the patch, I did confirm that the patch resolves the issue in a real Flink cluster.


import org.apache.flink.streaming.api.datastream.DataStream;

public class FlinkVersionDetector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to create a new class for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally made the separate class to make mocking the logic in the unit test easier. Just moved all the logic to FlinkPackage and used static mocking to do the test.

*/
}
if (version == null) {
version = "1.16.x";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This beats the reason for the automatic version detection. The original goal was to avoid versions in java code. See: #6206

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe go for: "Flink-UNKNOWN" in this case?
CC: @stevenzwu, @nastra, @Fokko

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched the patch to use a single "Flink-UNKNOWN" constant as you suggested.

1.16.x below
*/
}
if (version == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not set this directly in the catch block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the default in a version == null check also takes care of the case where the reflection returns null instead of an exception, which I saw happen in our TaskManagers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me this is more readable:

  public static String version() {
    try {
      String version = getVersionFromJar();
      return version != null ? version : FLINK_UNKNOWN_VERSION;
    } catch (Exception e) {
      return FLINK_UNKNOWN_VERSION;
    }
  }

  @VisibleForTesting
  static String getVersionFromJar() {
    /* Choose {@link DataStream} class because it is one of the core Flink API. */
    return DataStream.class.getPackage().getImplementationVersion();
  }

The reasons:

  • Easier to read if the error is handled immediately in the catch block
  • Easier to read if the null is handled when we fetch the data
  • Add @VisibleForTesting annotation to the method to tell why it is not private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made the changes you suggested.

Copy link
Contributor

@pvary pvary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1.

Let's see if @stevenzwu, @nastra or @Fokko has any comments

return VERSION;
try {
String version = getVersionFromJar();
/* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please use // instead of /* */ for inline comments?

appears multiple times in the same classpath such as with shading), then the best we can do is say it's
unknown
*/
return version != null ? version : FLINK_UNKNOWN_VERSION;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should store the result in a variable rather than re-fetching the version on every call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. this method can be used to initialize the static VERSION variable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra @stevenzwu - Iceberg's checkstyle config forbids naming a non-final static variable an all-caps name; it wants camel-case.

Should I:

  1. name it "version" OR
  2. Name it VERSION and add a SuppressWarning flag OR
  3. Name it VERSION and fix the checkstyle.xml to use the same regex as static constants use

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want to slightly update the implementation to

public class FlinkPackage {

  private static final AtomicReference<String> VERSION = new AtomicReference<>();
  private static final String UNKNOWN_VERSION = "UNKNOWN-VERSION";

  private FlinkPackage() {}

  /** Returns Flink version string like x.y.z */
  public static String version() {
    if (null == VERSION.get()) {
      VERSION.set(initFlinkVersion());
    }

    return VERSION.get();
  }

  private static String initFlinkVersion() {
    try {
      String version = versionFromJar();
      // use unknown version in case exact implementation version can't be found from the jar (this
      // can happen if the DataStream class appears multiple times in the same classpath such as
      // with shading)
      return version != null ? version : UNKNOWN_VERSION;
    } catch (Exception e) {
      return UNKNOWN_VERSION;
    }
  }

  @VisibleForTesting
  static String versionFromJar() {
    // Choose {@link DataStream} class because it is one of the core Flink API
    return DataStream.class.getPackage().getImplementationVersion();
  }
}

Notice that I also removed the get prefix from versionFromJar() as Iceberg typically doesn't use get methods

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this, we might not want to create an extra method for initFlinkVersion, just something like this:

  public static String version() {
    if (null == VERSION.get()) {
      try {
        String jarVersion = versionFromJar();
        // use unknown version in case exact implementation version can't be found from the jar (this
        // can happen if the DataStream class appears multiple times in the same classpath such as
        // with shading)
        VERSION.set(jarVersion != null ? jarVersion : UNKNOWN_VERSION);
      } catch (Exception e) {
        VERSION.set(UNKNOWN_VERSION);
      }
    }

    return VERSION.get();
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this simple way work, where initFlinkVersion implements the try-catch?

private static final String VERSION = initFlinkVersion();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu - making VERSION private static final String makes it untestable, because you can't test the normal and default code paths in the same JVM.

@nastra @pvary - Using an AtomicReference would work, and if that's what you want I'll do it, but isn't that more complexity than this logic requires? private static volatile String works just fine, aside from the checkstyle issue mentioned above -- I need to either tweak the checkstyle settings, or give it a camelCase name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, this is a pretty simple patch and it's been going back and forth for a week. Can we come to a consensus on what exactly the patch needs, so that if those things are done it's ready for commit? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uploaded a 1-method version using AtomicReference similar to what @nastra and @pvary were suggesting.


private FlinkPackage() {}

/** Returns Flink version string like x.y.z */
public static String version() {
return VERSION;
if (null == VERSION.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we move the try-catch/null handling to an initVersion method?

private static final String VERSION = initVersion()
...
private static String initVersion() {
     String detectedVersion = null;
      try {
        detectedVersion = versionFromJar();
        // use unknown version in case exact implementation version can't be found from the jar
        // (this can happen if the DataStream class appears multiple times in the same classpath
        // such as with shading)
        detectedVersion = detectedVersion != null ? detectedVersion : FLINK_UNKNOWN_VERSION;
      } catch (Exception e) {
        detectedVersion = FLINK_UNKNOWN_VERSION;
      }
      return detectedVersion;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu - the cached value cannot be static final String because that will make the code untestable. (static final AtomicReference<String> is fine, because its internal state is still mutable.) If it's not final, it cannot be named VERSION because the checkstyle config requires non-final static variables to be camelCased.

The current implementation reflects @nastra and @pvary 's last suggestions -- @nastra 's suggestion to use AtomicReference and @pvary 's suggestion to consolidate to one function.

At this point we seem to have contradictory suggestions based on slightly different aesthetic preferences since no two people write code exactly the same. I'm fine implementing any of them, but I can't do all of them.

Is the current implementation ready for commit or can we come to a consensus on what needs to be done to get it ready?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure it is really necessary to add a unit test for the trivial change. anyway, this is fine to me. I will leave the style decision to @nastra and @pvary

@gjacoby126
Copy link
Contributor Author

@nastra @pvary , just checking back in. I was wondering if there were any more changes you'd like to see to this patch, or if it's ready for commit?

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I'll wait a bit in case @pvary has some feedback

@pvary pvary merged commit 26d62c0 into apache:main Jan 31, 2024
13 checks passed
@pvary
Copy link
Contributor

pvary commented Jan 31, 2024

Thanks @gjacoby126 for the PR and the persistence until we decided on the right approach!

@gjacoby126
Copy link
Contributor Author

Thanks for the reviews, @pvary @nastra and @stevenzwu!

devangjhabakh pushed a commit to cdouglas/iceberg that referenced this pull request Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Flink] NullPointer when doing FlinkEnvironmentContext.init for Flink 1.17 and iceberg 1.3.0
4 participants