Skip to content

Commit

Permalink
[FLINK-36598] Provide FileSystem instance in intialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Oct 31, 2024
1 parent 60474f4 commit 0d346d4
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 60 deletions.
125 changes: 76 additions & 49 deletions env/flink/env_flink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ class FlinkReadableFile : virtual public FSSequentialFile,
~FlinkReadableFile() override {
JNIEnv* jniEnv = getJNIEnv();
if (fs_data_input_stream_instance_ != nullptr) {
JavaClassCache::JavaMethodContext closeMethod = class_cache_->GetJMethod(
JavaClassCache::JM_FLINK_FS_INPUT_STREAM_CLOSE);
jniEnv->CallVoidMethod(fs_data_input_stream_instance_,
closeMethod.javaMethod);
jniEnv->DeleteGlobalRef(fs_data_input_stream_instance_);
}
}
Expand Down Expand Up @@ -305,8 +309,16 @@ class FlinkDirectory : public FSDirectory {
};

FlinkFileSystem::FlinkFileSystem(const std::shared_ptr<FileSystem>& base_fs,
const std::string& base_path)
: FileSystemWrapper(base_fs), base_path_(TrimTrailingSlash(base_path)) {}
const std::string& base_path,
jobject file_system_instance)
: FileSystemWrapper(base_fs), base_path_(TrimTrailingSlash(base_path)) {
if (file_system_instance != nullptr) {
JNIEnv* env = getJNIEnv();
file_system_instance_ = env->NewGlobalRef(file_system_instance);
} else {
file_system_instance_ = nullptr;
}
}

FlinkFileSystem::~FlinkFileSystem() {
if (file_system_instance_ != nullptr) {
Expand All @@ -325,48 +337,60 @@ Status FlinkFileSystem::Init() {
}
class_cache_ = javaClassCache.release();

// Delegate Flink to load real FileSystem (e.g.
// S3FileSystem/OSSFileSystem/...)
JavaClassCache::JavaClassContext fileSystemClass =
class_cache_->GetJClass(JavaClassCache::JC_FLINK_FILE_SYSTEM);
JavaClassCache::JavaMethodContext fileSystemGetMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_GET);

JavaClassCache::JavaClassContext uriClass =
class_cache_->GetJClass(JavaClassCache::JC_URI);
JavaClassCache::JavaMethodContext uriConstructor =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_URI_CONSTRUCTOR);

// Construct URI
jstring uriStringArg = jniEnv->NewStringUTF(base_path_.c_str());
jobject uriInstance = jniEnv->NewObject(
uriClass.javaClass, uriConstructor.javaMethod, uriStringArg);
jniEnv->DeleteLocalRef(uriStringArg);
if (uriInstance == nullptr) {
return CheckThenError(
std::string("NewObject Exception when Init FlinkFileSystem, ")
.append(uriClass.ToString())
.append(uriConstructor.ToString())
.append(", args: ")
.append(base_path_));
}

// Construct FileSystem
jobject fileSystemInstance = jniEnv->CallStaticObjectMethod(
fileSystemClass.javaClass, fileSystemGetMethod.javaMethod, uriInstance);
jniEnv->DeleteLocalRef(uriInstance);
if (fileSystemInstance == nullptr || jniEnv->ExceptionCheck()) {
if (file_system_instance_ == nullptr) {
// Delegate Flink to load real FileSystem (e.g.
// S3FileSystem/OSSFileSystem/...)
JavaClassCache::JavaClassContext fileSystemClass =
class_cache_->GetJClass(JavaClassCache::JC_FLINK_FILE_SYSTEM);
JavaClassCache::JavaMethodContext fileSystemGetMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_GET);

JavaClassCache::JavaClassContext uriClass =
class_cache_->GetJClass(JavaClassCache::JC_URI);
JavaClassCache::JavaMethodContext uriConstructor =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_URI_CONSTRUCTOR);

// Construct URI
jstring uriStringArg = jniEnv->NewStringUTF(base_path_.c_str());
jobject uriInstance = jniEnv->NewObject(
uriClass.javaClass, uriConstructor.javaMethod, uriStringArg);
jniEnv->DeleteLocalRef(uriStringArg);
if (uriInstance == nullptr) {
return CheckThenError(
std::string("NewObject Exception when Init FlinkFileSystem, ")
.append(uriClass.ToString())
.append(uriConstructor.ToString())
.append(", args: ")
.append(base_path_));
}

// Construct FileSystem
jobject fileSystemInstance = jniEnv->CallStaticObjectMethod(
fileSystemClass.javaClass, fileSystemGetMethod.javaMethod, uriInstance);
jniEnv->DeleteLocalRef(uriInstance);
if (fileSystemInstance == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
"CallStaticObjectMethod Exception when Init FlinkFileSystem, ")
.append(fileSystemClass.ToString())
.append(fileSystemGetMethod.ToString())
.append(", args: URI(")
.append(base_path_)
.append(")"));
}
file_system_instance_ = jniEnv->NewGlobalRef(fileSystemInstance);
jniEnv->DeleteLocalRef(fileSystemInstance);
}

if (file_system_instance_ == nullptr) {
return CheckThenError(std::string(
"Error when init flink env, the file system provided is null"));
}

if (jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
"CallStaticObjectMethod Exception when Init FlinkFileSystem, ")
.append(fileSystemClass.ToString())
.append(fileSystemGetMethod.ToString())
.append(", args: URI(")
.append(base_path_)
.append(")"));
std::string("Error when init flink env, JNI throws exception."));
}
file_system_instance_ = jniEnv->NewGlobalRef(fileSystemInstance);
jniEnv->DeleteLocalRef(fileSystemInstance);
return Status::OK();
}

Expand Down Expand Up @@ -856,28 +880,31 @@ IOStatus FlinkFileSystem::UnlockFile(FileLock* /*lock*/,

Status FlinkFileSystem::Create(const std::shared_ptr<FileSystem>& base,
const std::string& uri,
std::unique_ptr<FileSystem>* result) {
auto* fileSystem = new FlinkFileSystem(base, uri);
std::unique_ptr<FileSystem>* result,
jobject file_system_instance) {
auto* fileSystem = new FlinkFileSystem(base, uri, file_system_instance);
Status status = fileSystem->Init();
result->reset(fileSystem);
return status;
}

Status NewFlinkEnv(const std::string& uri,
std::unique_ptr<Env>* flinkFileSystem) {
std::unique_ptr<Env>* flinkFileSystem,
jobject file_system_instance) {
std::shared_ptr<FileSystem> fs;
Status s = NewFlinkFileSystem(uri, &fs);
Status s = NewFlinkFileSystem(uri, &fs, file_system_instance);
if (s.ok()) {
*flinkFileSystem = NewCompositeEnv(fs);
}
return s;
}

Status NewFlinkFileSystem(const std::string& uri,
std::shared_ptr<FileSystem>* fs) {
std::shared_ptr<FileSystem>* fs,
jobject file_system_instance) {
std::unique_ptr<FileSystem> flinkFileSystem;
Status s =
FlinkFileSystem::Create(FileSystem::Default(), uri, &flinkFileSystem);
Status s = FlinkFileSystem::Create(FileSystem::Default(), uri,
&flinkFileSystem, file_system_instance);
if (s.ok()) {
fs->reset(flinkFileSystem.release());
}
Expand Down
12 changes: 8 additions & 4 deletions env/flink/env_flink.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class FlinkFileSystem : public FileSystemWrapper {
// base_path
static Status Create(const std::shared_ptr<FileSystem>& /*base_fs*/,
const std::string& /*base_path*/,
std::unique_ptr<FileSystem>* /*fs*/);
std::unique_ptr<FileSystem>* /*fs*/,
jobject file_system_instance);

// Define some names
static const char* kClassName() { return "FlinkFileSystem"; }
Expand Down Expand Up @@ -103,7 +104,8 @@ class FlinkFileSystem : public FileSystemWrapper {
jobject file_system_instance_;

explicit FlinkFileSystem(const std::shared_ptr<FileSystem>& base,
const std::string& fsname);
const std::string& fsname,
jobject file_system_instance);

// Init FileSystem
Status Init();
Expand All @@ -126,8 +128,10 @@ class FlinkFileSystem : public FileSystemWrapper {
};

// Returns a `FlinkEnv` with base_path
Status NewFlinkEnv(const std::string& base_path, std::unique_ptr<Env>* env);
Status NewFlinkEnv(const std::string& base_path, std::unique_ptr<Env>* env,
jobject file_system_instance);
// Returns a `FlinkFileSystem` with base_path
Status NewFlinkFileSystem(const std::string& base_path,
std::shared_ptr<FileSystem>* fs);
std::shared_ptr<FileSystem>* fs,
jobject file_system_instance);
} // namespace ROCKSDB_NAMESPACE
3 changes: 2 additions & 1 deletion env/flink/env_flink_test_suite.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ void EnvFlinkTestSuites::runAllTestSuites() {
}

void EnvFlinkTestSuites::setUp() {
auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(base_path_, &flink_env_);
auto status =
ROCKSDB_NAMESPACE::NewFlinkEnv(base_path_, &flink_env_, nullptr);
if (!status.ok()) {
throw std::runtime_error("New FlinkEnv failed");
}
Expand Down
7 changes: 7 additions & 0 deletions env/flink/jni_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ IOStatus JavaClassCache::Init() {
cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_SKIP]
.signature = "(J)J";

cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_CLOSE]
.javaClassAndName = cached_java_classes_[JC_FLINK_FS_INPUT_STREAM];
cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_CLOSE]
.methodName = "close";
cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_CLOSE]
.signature = "()V";

cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_WRITE]
.javaClassAndName = cached_java_classes_[JC_FLINK_FS_OUTPUT_STREAM];
cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_WRITE]
Expand Down
1 change: 1 addition & 0 deletions env/flink/jni_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class JavaClassCache {
JM_FLINK_FS_INPUT_STREAM_SEQ_READ,
JM_FLINK_FS_INPUT_STREAM_RANDOM_READ,
JM_FLINK_FS_INPUT_STREAM_SKIP,
JM_FLINK_FS_INPUT_STREAM_CLOSE,
JM_FLINK_FS_OUTPUT_STREAM_WRITE,
JM_FLINK_FS_OUTPUT_STREAM_FLUSH,
JM_FLINK_FS_OUTPUT_STREAM_SYNC,
Expand Down
8 changes: 5 additions & 3 deletions java/forstjni/env_flink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
/*
* Class: org_forstdb_FlinkEnv
* Method: createFlinkEnv
* Signature: (Ljava/lang/String;)J
* Signature: (Ljava/lang/String;Ljava/lang/Object;)J
*/
jlong Java_org_forstdb_FlinkEnv_createFlinkEnv(JNIEnv* env, jclass,
jstring base_path) {
jstring base_path,
jobject file_system_instance_) {
jboolean has_exception = JNI_FALSE;
auto path =
ROCKSDB_NAMESPACE::JniUtil::copyStdString(env, base_path, &has_exception);
Expand All @@ -41,7 +42,8 @@ jlong Java_org_forstdb_FlinkEnv_createFlinkEnv(JNIEnv* env, jclass,
return 0;
}
std::unique_ptr<ROCKSDB_NAMESPACE::Env> flink_env;
auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(path, &flink_env);
auto status =
ROCKSDB_NAMESPACE::NewFlinkEnv(path, &flink_env, file_system_instance_);
if (!status.ok()) {
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, status);
return 0;
Expand Down
6 changes: 3 additions & 3 deletions java/src/main/java/org/forstdb/FlinkEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ public class FlinkEnv extends Env {
* @param basePath the base path string for the given Flink file system,
* formatted as "{fs-schema-supported-by-flink}://xxx"
*/
public FlinkEnv(final String basePath) {
super(createFlinkEnv(basePath));
public FlinkEnv(final String basePath, final Object fileSystem) {
super(createFlinkEnv(basePath, fileSystem));
}

private static native long createFlinkEnv(final String basePath);
private static native long createFlinkEnv(final String basePath, final Object fileSystem);

@Override protected final native void disposeInternal(final long handle);
}

0 comments on commit 0d346d4

Please sign in to comment.