-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Added error handling and default logic for Flink version detection #9452
Conversation
try { | ||
version = getVersionFromJar(); | ||
} catch (Exception e) { | ||
/* we can't detect the exact implementation version from the jar (this can happen if the DataStream class |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it a packaging issue, if we have multiple org.apache.flink.streaming.api.datastream.DataStream
classes on the classpath?
In this case the load order of these classes might depend on timining, and could cause issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary - Yes, it's a packaging issue. In my use case that reproduced the problem, the two instances of DataStream on the classpath are actually identical, due to unrelocated shading. That normally would cause no issues aside from build warnings. Of course we're fixing that too.
Whether the packaging problem causes other bugs or not though, Iceberg's reaction shouldn't be "throw an NPE and crash the Flink pipeline", which is what currently happens and this PR fixes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, and just fyi, I'm not affiliated with the engineers who posted the original issue this patch fixes; I just came across it while I was investigating. So this has happened at least twice in real-world use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we get the NullPointerException
in this case?
By my experience if we have the class on the classpath multiple times, Java just uses the first one it founds. Was it because the shading messed up with the metadata of the jar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know for sure, but the shading messing up the metadata is my best guess too.
In addition to the unit test in the patch, I did confirm that the patch resolves the issue in a real Flink cluster.
|
||
import org.apache.flink.streaming.api.datastream.DataStream; | ||
|
||
public class FlinkVersionDetector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to create a new class for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally made the separate class to make mocking the logic in the unit test easier. Just moved all the logic to FlinkPackage and used static mocking to do the test.
*/ | ||
} | ||
if (version == null) { | ||
version = "1.16.x"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This beats the reason for the automatic version detection. The original goal was to avoid versions in java code. See: #6206
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe go for: "Flink-UNKNOWN" in this case?
CC: @stevenzwu, @nastra, @Fokko
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched the patch to use a single "Flink-UNKNOWN" constant as you suggested.
1.16.x below | ||
*/ | ||
} | ||
if (version == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not set this directly in the catch block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting the default in a version == null
check also takes care of the case where the reflection returns null instead of an exception, which I saw happen in our TaskManagers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me this is more readable:
public static String version() {
try {
String version = getVersionFromJar();
return version != null ? version : FLINK_UNKNOWN_VERSION;
} catch (Exception e) {
return FLINK_UNKNOWN_VERSION;
}
}
@VisibleForTesting
static String getVersionFromJar() {
/* Choose {@link DataStream} class because it is one of the core Flink API. */
return DataStream.class.getPackage().getImplementationVersion();
}
The reasons:
- Easier to read if the error is handled immediately in the
catch
block - Easier to read if the
null
is handled when we fetch the data - Add
@VisibleForTesting
annotation to the method to tell why it is not private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made the changes you suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1.
Let's see if @stevenzwu, @nastra or @Fokko has any comments
return VERSION; | ||
try { | ||
String version = getVersionFromJar(); | ||
/* If we can't detect the exact implementation version from the jar (this can happen if the DataStream class |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we please use // instead of /* */ for inline comments?
appears multiple times in the same classpath such as with shading), then the best we can do is say it's | ||
unknown | ||
*/ | ||
return version != null ? version : FLINK_UNKNOWN_VERSION; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should store the result in a variable rather than re-fetching the version on every call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. this method can be used to initialize the static VERSION
variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nastra @stevenzwu - Iceberg's checkstyle config forbids naming a non-final static variable an all-caps name; it wants camel-case.
Should I:
- name it "version" OR
- Name it VERSION and add a SuppressWarning flag OR
- Name it VERSION and fix the checkstyle.xml to use the same regex as static constants use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want to slightly update the implementation to
public class FlinkPackage {
private static final AtomicReference<String> VERSION = new AtomicReference<>();
private static final String UNKNOWN_VERSION = "UNKNOWN-VERSION";
private FlinkPackage() {}
/** Returns Flink version string like x.y.z */
public static String version() {
if (null == VERSION.get()) {
VERSION.set(initFlinkVersion());
}
return VERSION.get();
}
private static String initFlinkVersion() {
try {
String version = versionFromJar();
// use unknown version in case exact implementation version can't be found from the jar (this
// can happen if the DataStream class appears multiple times in the same classpath such as
// with shading)
return version != null ? version : UNKNOWN_VERSION;
} catch (Exception e) {
return UNKNOWN_VERSION;
}
}
@VisibleForTesting
static String versionFromJar() {
// Choose {@link DataStream} class because it is one of the core Flink API
return DataStream.class.getPackage().getImplementationVersion();
}
}
Notice that I also removed the get
prefix from versionFromJar()
as Iceberg typically doesn't use get
methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do this, we might not want to create an extra method for initFlinkVersion
, just something like this:
public static String version() {
if (null == VERSION.get()) {
try {
String jarVersion = versionFromJar();
// use unknown version in case exact implementation version can't be found from the jar (this
// can happen if the DataStream class appears multiple times in the same classpath such as
// with shading)
VERSION.set(jarVersion != null ? jarVersion : UNKNOWN_VERSION);
} catch (Exception e) {
VERSION.set(UNKNOWN_VERSION);
}
}
return VERSION.get();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would this simple way work, where initFlinkVersion implements the try-catch?
private static final String VERSION = initFlinkVersion();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu - making VERSION private static final String
makes it untestable, because you can't test the normal and default code paths in the same JVM.
@nastra @pvary - Using an AtomicReference would work, and if that's what you want I'll do it, but isn't that more complexity than this logic requires? private static volatile String
works just fine, aside from the checkstyle issue mentioned above -- I need to either tweak the checkstyle settings, or give it a camelCase name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, this is a pretty simple patch and it's been going back and forth for a week. Can we come to a consensus on what exactly the patch needs, so that if those things are done it's ready for commit? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
private FlinkPackage() {} | ||
|
||
/** Returns Flink version string like x.y.z */ | ||
public static String version() { | ||
return VERSION; | ||
if (null == VERSION.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we move the try-catch/null handling to an initVersion
method?
private static final String VERSION = initVersion()
...
private static String initVersion() {
String detectedVersion = null;
try {
detectedVersion = versionFromJar();
// use unknown version in case exact implementation version can't be found from the jar
// (this can happen if the DataStream class appears multiple times in the same classpath
// such as with shading)
detectedVersion = detectedVersion != null ? detectedVersion : FLINK_UNKNOWN_VERSION;
} catch (Exception e) {
detectedVersion = FLINK_UNKNOWN_VERSION;
}
return detectedVersion;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu - the cached value cannot be static final String
because that will make the code untestable. (static final AtomicReference<String>
is fine, because its internal state is still mutable.) If it's not final, it cannot be named VERSION
because the checkstyle config requires non-final static variables to be camelCased.
The current implementation reflects @nastra and @pvary 's last suggestions -- @nastra 's suggestion to use AtomicReference and @pvary 's suggestion to consolidate to one function.
At this point we seem to have contradictory suggestions based on slightly different aesthetic preferences since no two people write code exactly the same. I'm fine implementing any of them, but I can't do all of them.
Is the current implementation ready for commit or can we come to a consensus on what needs to be done to get it ready?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I'll wait a bit in case @pvary has some feedback
Thanks @gjacoby126 for the PR and the persistence until we decided on the right approach! |
Thanks for the reviews, @pvary @nastra and @stevenzwu! |
When FlinkEnvironmentContext initializes, it tries to calculate the exact version of Flink it uses via reflection. This can fail in certain conditions, potentially crashing the Flink pipeline. This patch will catch any failures and give a default based on the known Flink minor version.
Closes #7879