Skip to content

Commit

Permalink
[FLINK-36868] Use file system methods with string parameters via JNI
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Dec 9, 2024
1 parent 9ad95c0 commit 582d659
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 210 deletions.
183 changes: 45 additions & 138 deletions env/flink/env_flink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,13 @@ class FlinkWritableFile : public FSWritableFile {

IOStatus Init() {
JNIEnv* jniEnv = getJNIEnv();
// Construct Path Instance
jobject pathInstance;
IOStatus status =
class_cache_->ConstructPathInstance(file_path_, &pathInstance);
if (!status.ok()) {
return status;
}
jstring pathString = jniEnv->NewStringUTF(file_path_.c_str());

JavaClassCache::JavaMethodContext fileSystemCreateMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_CREATE);
jobject fsDataOutputStream = jniEnv->CallObjectMethod(
file_system_instance_, fileSystemCreateMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
file_system_instance_, fileSystemCreateMethod.javaMethod, pathString);
jniEnv->DeleteLocalRef(pathString);
if (fsDataOutputStream == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
Expand Down Expand Up @@ -184,19 +178,13 @@ class FlinkReadableFile : virtual public FSSequentialFile,

IOStatus Init() {
JNIEnv* jniEnv = getJNIEnv();
// Construct Path Instance
jobject pathInstance;
IOStatus status =
class_cache_->ConstructPathInstance(file_path_, &pathInstance);
if (!status.ok()) {
return status;
}
jstring pathString = jniEnv->NewStringUTF(file_path_.c_str());

JavaClassCache::JavaMethodContext openMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_OPEN);
jobject fsDataInputStream = jniEnv->CallObjectMethod(
file_system_instance_, openMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
file_system_instance_, openMethod.javaMethod, pathString);
jniEnv->DeleteLocalRef(pathString);
if (fsDataInputStream == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
Expand Down Expand Up @@ -345,29 +333,13 @@ Status FlinkFileSystem::Init() {
JavaClassCache::JavaMethodContext fileSystemGetMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_GET);

JavaClassCache::JavaClassContext uriClass =
class_cache_->GetJClass(JavaClassCache::JC_URI);
JavaClassCache::JavaMethodContext uriConstructor =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_URI_CONSTRUCTOR);

// Construct URI
jstring uriStringArg = jniEnv->NewStringUTF(base_path_.c_str());
jobject uriInstance = jniEnv->NewObject(
uriClass.javaClass, uriConstructor.javaMethod, uriStringArg);
jniEnv->DeleteLocalRef(uriStringArg);
if (uriInstance == nullptr) {
return CheckThenError(
std::string("NewObject Exception when Init FlinkFileSystem, ")
.append(uriClass.ToString())
.append(uriConstructor.ToString())
.append(", args: ")
.append(base_path_));
}

// Construct FileSystem
jobject fileSystemInstance = jniEnv->CallStaticObjectMethod(
fileSystemClass.javaClass, fileSystemGetMethod.javaMethod, uriInstance);
jniEnv->DeleteLocalRef(uriInstance);
fileSystemClass.javaClass, fileSystemGetMethod.javaMethod,
uriStringArg);
jniEnv->DeleteLocalRef(uriStringArg);
if (fileSystemInstance == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
Expand Down Expand Up @@ -472,23 +444,17 @@ IOStatus FlinkFileSystem::FileExists(const std::string& file_name,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) {
std::string filePath = ConstructPath(file_name);
// Construct Path Instance
jobject pathInstance;
IOStatus status =
class_cache_->ConstructPathInstance(filePath, &pathInstance);
if (!status.ok()) {
return status;
}
JNIEnv* jniEnv = getJNIEnv();
jstring pathString = jniEnv->NewStringUTF(filePath.c_str());

// Call exist method
JNIEnv* jniEnv = getJNIEnv();
JavaClassCache::JavaMethodContext existsMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_EXISTS);
jboolean exists = jniEnv->CallBooleanMethod(
file_system_instance_, existsMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
file_system_instance_, existsMethod.javaMethod, pathString);
jniEnv->DeleteLocalRef(pathString);

status = CurrentStatus([filePath]() {
IOStatus status = CurrentStatus([filePath]() {
return std::string("Exception when FileExists, path: ").append(filePath);
});
if (!status.ok()) {
Expand All @@ -513,21 +479,15 @@ IOStatus FlinkFileSystem::GetChildren(const std::string& file_name,
}

std::string filePath = ConstructPath(file_name);
// Construct Path Instance
jobject pathInstance;
IOStatus status =
class_cache_->ConstructPathInstance(filePath, &pathInstance);
if (!status.ok()) {
return status;
}

JNIEnv* jniEnv = getJNIEnv();
jstring pathString = jniEnv->NewStringUTF(filePath.c_str());

JavaClassCache::JavaMethodContext listStatusMethod = class_cache_->GetJMethod(
JavaClassCache::JM_FLINK_FILE_SYSTEM_LIST_STATUS);

auto fileStatusArray = (jobjectArray)jniEnv->CallObjectMethod(
file_system_instance_, listStatusMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
file_system_instance_, listStatusMethod.javaMethod, pathString);
jniEnv->DeleteLocalRef(pathString);
if (fileStatusArray == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string("Exception when CallObjectMethod in GetChildren, ")
Expand All @@ -548,26 +508,14 @@ IOStatus FlinkFileSystem::GetChildren(const std::string& file_name,

JavaClassCache::JavaMethodContext getPathMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_STATUS_GET_PATH);
jobject subPath =
jniEnv->CallObjectMethod(fileStatusObj, getPathMethod.javaMethod);
jniEnv->DeleteLocalRef(fileStatusObj);
if (subPath == nullptr || jniEnv->ExceptionCheck()) {
jniEnv->DeleteLocalRef(fileStatusArray);
return CheckThenError(
std::string("Exception when CallObjectMethod in GetChildren, ")
.append(getPathMethod.ToString()));
}

JavaClassCache::JavaMethodContext pathToStringMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_PATH_TO_STRING);
auto subPathStr = (jstring)jniEnv->CallObjectMethod(
subPath, pathToStringMethod.javaMethod);
jniEnv->DeleteLocalRef(subPath);
fileStatusObj, getPathMethod.javaMethod);
jniEnv->DeleteLocalRef(fileStatusObj);
if (subPathStr == nullptr || jniEnv->ExceptionCheck()) {
jniEnv->DeleteLocalRef(fileStatusArray);
return CheckThenError(
std::string("Exception when CallObjectMethod in GetChildren, ")
.append(pathToStringMethod.ToString()));
.append(getPathMethod.ToString()));
}

const char* str = jniEnv->GetStringUTFChars(subPathStr, nullptr);
Expand Down Expand Up @@ -603,25 +551,19 @@ IOStatus FlinkFileSystem::Delete(const std::string& file_name,
.append(ConstructPath(file_name)))
: fileExistsStatus;
}
JNIEnv* jniEnv = getJNIEnv();

std::string filePath = ConstructPath(file_name);
// Construct Path Instance
jobject pathInstance;
IOStatus status =
class_cache_->ConstructPathInstance(filePath, &pathInstance);
if (!status.ok()) {
return status;
}
jstring pathString = jniEnv->NewStringUTF(filePath.c_str());

// Call delete method
JNIEnv* jniEnv = getJNIEnv();
JavaClassCache::JavaMethodContext deleteMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_DELETE);
jboolean deleted = jniEnv->CallBooleanMethod(
file_system_instance_, deleteMethod.javaMethod, pathInstance, recursive);
jniEnv->DeleteLocalRef(pathInstance);
file_system_instance_, deleteMethod.javaMethod, pathString, recursive);
jniEnv->DeleteLocalRef(pathString);

status = CurrentStatus([filePath]() {
IOStatus status = CurrentStatus([filePath]() {
return std::string("Exception when Delete, path: ").append(filePath);
});
if (!status.ok()) {
Expand Down Expand Up @@ -652,21 +594,15 @@ IOStatus FlinkFileSystem::CreateDirIfMissing(const std::string& file_name,
JNIEnv* jniEnv = getJNIEnv();

std::string filePath = ConstructPath(file_name);
// Construct Path Instance
jobject pathInstance;
IOStatus status =
class_cache_->ConstructPathInstance(filePath, &pathInstance);
if (!status.ok()) {
return status;
}
jstring pathString = jniEnv->NewStringUTF(filePath.c_str());

// Call mkdirs method
JavaClassCache::JavaMethodContext mkdirMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_MKDIR);
jboolean created = jniEnv->CallBooleanMethod(
file_system_instance_, mkdirMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
status = CurrentStatus([filePath]() {
file_system_instance_, mkdirMethod.javaMethod, pathString);
jniEnv->DeleteLocalRef(pathString);
IOStatus status = CurrentStatus([filePath]() {
return std::string("Exception when CreateDirIfMissing, path: ")
.append(filePath);
});
Expand Down Expand Up @@ -722,22 +658,18 @@ IOStatus FlinkFileSystem::GetFileStatus(const std::string& file_name,
: status;
}

JNIEnv* jniEnv = getJNIEnv();

std::string filePath = ConstructPath(file_name);
// Construct Path Instance
jobject pathInstance;
status = class_cache_->ConstructPathInstance(filePath, &pathInstance);
if (!status.ok()) {
return status;
}
jstring pathString = jniEnv->NewStringUTF(filePath.c_str());

// Call getFileStatus method
JNIEnv* jniEnv = getJNIEnv();
JavaClassCache::JavaMethodContext getFileStatusMethod =
class_cache_->GetJMethod(
JavaClassCache::JM_FLINK_FILE_SYSTEM_GET_FILE_STATUS);
*fileStatus = jniEnv->CallObjectMethod(
file_system_instance_, getFileStatusMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
file_system_instance_, getFileStatusMethod.javaMethod, pathString);
jniEnv->DeleteLocalRef(pathString);

return CurrentStatus([filePath]() {
return std::string("Exception when GetFileStatus, path: ").append(filePath);
Expand Down Expand Up @@ -818,30 +750,17 @@ IOStatus FlinkFileSystem::RenameFile(const std::string& src,
JNIEnv* jniEnv = getJNIEnv();

std::string srcFilePath = ConstructPath(src);
// Construct src Path Instance
jobject srcPathInstance;
status = class_cache_->ConstructPathInstance(srcFilePath, &srcPathInstance);
if (!status.ok()) {
return status;
}

jstring srcPathString = jniEnv->NewStringUTF(srcFilePath.c_str());
std::string targetFilePath = ConstructPath(target);
// Construct target Path Instance
jobject targetPathInstance;
status =
class_cache_->ConstructPathInstance(targetFilePath, &targetPathInstance);
if (!status.ok()) {
jniEnv->DeleteLocalRef(srcPathInstance);
return status;
}
jstring targetPathString = jniEnv->NewStringUTF(targetFilePath.c_str());

JavaClassCache::JavaMethodContext renameMethod = class_cache_->GetJMethod(
JavaClassCache::JM_FLINK_FILE_SYSTEM_RENAME_FILE);
jboolean renamed =
jniEnv->CallBooleanMethod(file_system_instance_, renameMethod.javaMethod,
srcPathInstance, targetPathInstance);
jniEnv->DeleteLocalRef(srcPathInstance);
jniEnv->DeleteLocalRef(targetPathInstance);
srcPathString, targetPathString);
jniEnv->DeleteLocalRef(srcPathString);
jniEnv->DeleteLocalRef(targetPathString);

status = CurrentStatus([srcFilePath, targetFilePath]() {
return std::string("Exception when RenameFile, src: ")
Expand Down Expand Up @@ -895,30 +814,18 @@ IOStatus FlinkFileSystem::LinkFile(const std::string& src,
JNIEnv* jniEnv = getJNIEnv();

std::string srcFilePath = ConstructPath(src);
// Construct src Path Instance
jobject srcPathInstance;
status = class_cache_->ConstructPathInstance(srcFilePath, &srcPathInstance);
if (!status.ok()) {
return status;
}
jstring srcPathString = jniEnv->NewStringUTF(srcFilePath.c_str());

std::string targetFilePath = ConstructPath(target);
// Construct target Path Instance
jobject targetPathInstance;
status =
class_cache_->ConstructPathInstance(targetFilePath, &targetPathInstance);
if (!status.ok()) {
jniEnv->DeleteLocalRef(srcPathInstance);
return status;
}
jstring targetPathString = jniEnv->NewStringUTF(targetFilePath.c_str());

JavaClassCache::JavaMethodContext linkMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_LINK_FILE);
jint linked =
jniEnv->CallIntMethod(file_system_instance_, linkMethod.javaMethod,
srcPathInstance, targetPathInstance);
jniEnv->DeleteLocalRef(srcPathInstance);
jniEnv->DeleteLocalRef(targetPathInstance);
srcPathString, targetPathString);
jniEnv->DeleteLocalRef(srcPathString);
jniEnv->DeleteLocalRef(targetPathString);

status = CurrentStatus([srcFilePath, targetFilePath]() {
return std::string("Exception when LinkFile, src: ")
Expand Down
Loading

0 comments on commit 582d659

Please sign in to comment.