From 582d659bfc3e126663a3bd31d3cb28b273cbbb16 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Mon, 9 Dec 2024 20:18:43 +0800 Subject: [PATCH] [FLINK-36868] Use file system methods with string parameters via JNI --- env/flink/env_flink.cc | 183 +++++------------- env/flink/jni_helper.cc | 76 ++------ env/flink/jni_helper.h | 10 - .../flink/state/forst/fs/ForStFileStatus.java | 60 ++++++ .../forst/fs/StringifiedForStFileSystem.java | 77 ++++++++ 5 files changed, 196 insertions(+), 210 deletions(-) create mode 100644 java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ForStFileStatus.java create mode 100644 java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/StringifiedForStFileSystem.java diff --git a/env/flink/env_flink.cc b/env/flink/env_flink.cc index f0eb58039..e8aadb3a0 100644 --- a/env/flink/env_flink.cc +++ b/env/flink/env_flink.cc @@ -53,19 +53,13 @@ class FlinkWritableFile : public FSWritableFile { IOStatus Init() { JNIEnv* jniEnv = getJNIEnv(); - // Construct Path Instance - jobject pathInstance; - IOStatus status = - class_cache_->ConstructPathInstance(file_path_, &pathInstance); - if (!status.ok()) { - return status; - } + jstring pathString = jniEnv->NewStringUTF(file_path_.c_str()); JavaClassCache::JavaMethodContext fileSystemCreateMethod = class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_CREATE); jobject fsDataOutputStream = jniEnv->CallObjectMethod( - file_system_instance_, fileSystemCreateMethod.javaMethod, pathInstance); - jniEnv->DeleteLocalRef(pathInstance); + file_system_instance_, fileSystemCreateMethod.javaMethod, pathString); + jniEnv->DeleteLocalRef(pathString); if (fsDataOutputStream == nullptr || jniEnv->ExceptionCheck()) { return CheckThenError( std::string( @@ -184,19 +178,13 @@ class FlinkReadableFile : virtual public FSSequentialFile, IOStatus Init() { JNIEnv* jniEnv = getJNIEnv(); - // Construct Path Instance - jobject pathInstance; - IOStatus status = - class_cache_->ConstructPathInstance(file_path_, &pathInstance); - if (!status.ok()) { - return status; - } + jstring pathString = jniEnv->NewStringUTF(file_path_.c_str()); JavaClassCache::JavaMethodContext openMethod = class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_OPEN); jobject fsDataInputStream = jniEnv->CallObjectMethod( - file_system_instance_, openMethod.javaMethod, pathInstance); - jniEnv->DeleteLocalRef(pathInstance); + file_system_instance_, openMethod.javaMethod, pathString); + jniEnv->DeleteLocalRef(pathString); if (fsDataInputStream == nullptr || jniEnv->ExceptionCheck()) { return CheckThenError( std::string( @@ -345,29 +333,13 @@ Status FlinkFileSystem::Init() { JavaClassCache::JavaMethodContext fileSystemGetMethod = class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_GET); - JavaClassCache::JavaClassContext uriClass = - class_cache_->GetJClass(JavaClassCache::JC_URI); - JavaClassCache::JavaMethodContext uriConstructor = - class_cache_->GetJMethod(JavaClassCache::JM_FLINK_URI_CONSTRUCTOR); - - // Construct URI jstring uriStringArg = jniEnv->NewStringUTF(base_path_.c_str()); - jobject uriInstance = jniEnv->NewObject( - uriClass.javaClass, uriConstructor.javaMethod, uriStringArg); - jniEnv->DeleteLocalRef(uriStringArg); - if (uriInstance == nullptr) { - return CheckThenError( - std::string("NewObject Exception when Init FlinkFileSystem, ") - .append(uriClass.ToString()) - .append(uriConstructor.ToString()) - .append(", args: ") - .append(base_path_)); - } // Construct FileSystem jobject fileSystemInstance = jniEnv->CallStaticObjectMethod( - fileSystemClass.javaClass, fileSystemGetMethod.javaMethod, uriInstance); - jniEnv->DeleteLocalRef(uriInstance); + fileSystemClass.javaClass, fileSystemGetMethod.javaMethod, + uriStringArg); + jniEnv->DeleteLocalRef(uriStringArg); if (fileSystemInstance == nullptr || jniEnv->ExceptionCheck()) { return CheckThenError( std::string( @@ -472,23 +444,17 @@ IOStatus FlinkFileSystem::FileExists(const std::string& file_name, const IOOptions& /*options*/, IODebugContext* /*dbg*/) { std::string filePath = ConstructPath(file_name); - // Construct Path Instance - jobject pathInstance; - IOStatus status = - class_cache_->ConstructPathInstance(filePath, &pathInstance); - if (!status.ok()) { - return status; - } + JNIEnv* jniEnv = getJNIEnv(); + jstring pathString = jniEnv->NewStringUTF(filePath.c_str()); // Call exist method - JNIEnv* jniEnv = getJNIEnv(); JavaClassCache::JavaMethodContext existsMethod = class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_EXISTS); jboolean exists = jniEnv->CallBooleanMethod( - file_system_instance_, existsMethod.javaMethod, pathInstance); - jniEnv->DeleteLocalRef(pathInstance); + file_system_instance_, existsMethod.javaMethod, pathString); + jniEnv->DeleteLocalRef(pathString); - status = CurrentStatus([filePath]() { + IOStatus status = CurrentStatus([filePath]() { return std::string("Exception when FileExists, path: ").append(filePath); }); if (!status.ok()) { @@ -513,21 +479,15 @@ IOStatus FlinkFileSystem::GetChildren(const std::string& file_name, } std::string filePath = ConstructPath(file_name); - // Construct Path Instance - jobject pathInstance; - IOStatus status = - class_cache_->ConstructPathInstance(filePath, &pathInstance); - if (!status.ok()) { - return status; - } - JNIEnv* jniEnv = getJNIEnv(); + jstring pathString = jniEnv->NewStringUTF(filePath.c_str()); + JavaClassCache::JavaMethodContext listStatusMethod = class_cache_->GetJMethod( JavaClassCache::JM_FLINK_FILE_SYSTEM_LIST_STATUS); auto fileStatusArray = (jobjectArray)jniEnv->CallObjectMethod( - file_system_instance_, listStatusMethod.javaMethod, pathInstance); - jniEnv->DeleteLocalRef(pathInstance); + file_system_instance_, listStatusMethod.javaMethod, pathString); + jniEnv->DeleteLocalRef(pathString); if (fileStatusArray == nullptr || jniEnv->ExceptionCheck()) { return CheckThenError( std::string("Exception when CallObjectMethod in GetChildren, ") @@ -548,26 +508,14 @@ IOStatus FlinkFileSystem::GetChildren(const std::string& file_name, JavaClassCache::JavaMethodContext getPathMethod = class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_STATUS_GET_PATH); - jobject subPath = - jniEnv->CallObjectMethod(fileStatusObj, getPathMethod.javaMethod); - jniEnv->DeleteLocalRef(fileStatusObj); - if (subPath == nullptr || jniEnv->ExceptionCheck()) { - jniEnv->DeleteLocalRef(fileStatusArray); - return CheckThenError( - std::string("Exception when CallObjectMethod in GetChildren, ") - .append(getPathMethod.ToString())); - } - - JavaClassCache::JavaMethodContext pathToStringMethod = - class_cache_->GetJMethod(JavaClassCache::JM_FLINK_PATH_TO_STRING); auto subPathStr = (jstring)jniEnv->CallObjectMethod( - subPath, pathToStringMethod.javaMethod); - jniEnv->DeleteLocalRef(subPath); + fileStatusObj, getPathMethod.javaMethod); + jniEnv->DeleteLocalRef(fileStatusObj); if (subPathStr == nullptr || jniEnv->ExceptionCheck()) { jniEnv->DeleteLocalRef(fileStatusArray); return CheckThenError( std::string("Exception when CallObjectMethod in GetChildren, ") - .append(pathToStringMethod.ToString())); + .append(getPathMethod.ToString())); } const char* str = jniEnv->GetStringUTFChars(subPathStr, nullptr); @@ -603,25 +551,19 @@ IOStatus FlinkFileSystem::Delete(const std::string& file_name, .append(ConstructPath(file_name))) : fileExistsStatus; } + JNIEnv* jniEnv = getJNIEnv(); std::string filePath = ConstructPath(file_name); - // Construct Path Instance - jobject pathInstance; - IOStatus status = - class_cache_->ConstructPathInstance(filePath, &pathInstance); - if (!status.ok()) { - return status; - } + jstring pathString = jniEnv->NewStringUTF(filePath.c_str()); // Call delete method - JNIEnv* jniEnv = getJNIEnv(); JavaClassCache::JavaMethodContext deleteMethod = class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_DELETE); jboolean deleted = jniEnv->CallBooleanMethod( - file_system_instance_, deleteMethod.javaMethod, pathInstance, recursive); - jniEnv->DeleteLocalRef(pathInstance); + file_system_instance_, deleteMethod.javaMethod, pathString, recursive); + jniEnv->DeleteLocalRef(pathString); - status = CurrentStatus([filePath]() { + IOStatus status = CurrentStatus([filePath]() { return std::string("Exception when Delete, path: ").append(filePath); }); if (!status.ok()) { @@ -652,21 +594,15 @@ IOStatus FlinkFileSystem::CreateDirIfMissing(const std::string& file_name, JNIEnv* jniEnv = getJNIEnv(); std::string filePath = ConstructPath(file_name); - // Construct Path Instance - jobject pathInstance; - IOStatus status = - class_cache_->ConstructPathInstance(filePath, &pathInstance); - if (!status.ok()) { - return status; - } + jstring pathString = jniEnv->NewStringUTF(filePath.c_str()); // Call mkdirs method JavaClassCache::JavaMethodContext mkdirMethod = class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_MKDIR); jboolean created = jniEnv->CallBooleanMethod( - file_system_instance_, mkdirMethod.javaMethod, pathInstance); - jniEnv->DeleteLocalRef(pathInstance); - status = CurrentStatus([filePath]() { + file_system_instance_, mkdirMethod.javaMethod, pathString); + jniEnv->DeleteLocalRef(pathString); + IOStatus status = CurrentStatus([filePath]() { return std::string("Exception when CreateDirIfMissing, path: ") .append(filePath); }); @@ -722,22 +658,18 @@ IOStatus FlinkFileSystem::GetFileStatus(const std::string& file_name, : status; } + JNIEnv* jniEnv = getJNIEnv(); + std::string filePath = ConstructPath(file_name); - // Construct Path Instance - jobject pathInstance; - status = class_cache_->ConstructPathInstance(filePath, &pathInstance); - if (!status.ok()) { - return status; - } + jstring pathString = jniEnv->NewStringUTF(filePath.c_str()); // Call getFileStatus method - JNIEnv* jniEnv = getJNIEnv(); JavaClassCache::JavaMethodContext getFileStatusMethod = class_cache_->GetJMethod( JavaClassCache::JM_FLINK_FILE_SYSTEM_GET_FILE_STATUS); *fileStatus = jniEnv->CallObjectMethod( - file_system_instance_, getFileStatusMethod.javaMethod, pathInstance); - jniEnv->DeleteLocalRef(pathInstance); + file_system_instance_, getFileStatusMethod.javaMethod, pathString); + jniEnv->DeleteLocalRef(pathString); return CurrentStatus([filePath]() { return std::string("Exception when GetFileStatus, path: ").append(filePath); @@ -818,30 +750,17 @@ IOStatus FlinkFileSystem::RenameFile(const std::string& src, JNIEnv* jniEnv = getJNIEnv(); std::string srcFilePath = ConstructPath(src); - // Construct src Path Instance - jobject srcPathInstance; - status = class_cache_->ConstructPathInstance(srcFilePath, &srcPathInstance); - if (!status.ok()) { - return status; - } - + jstring srcPathString = jniEnv->NewStringUTF(srcFilePath.c_str()); std::string targetFilePath = ConstructPath(target); - // Construct target Path Instance - jobject targetPathInstance; - status = - class_cache_->ConstructPathInstance(targetFilePath, &targetPathInstance); - if (!status.ok()) { - jniEnv->DeleteLocalRef(srcPathInstance); - return status; - } + jstring targetPathString = jniEnv->NewStringUTF(targetFilePath.c_str()); JavaClassCache::JavaMethodContext renameMethod = class_cache_->GetJMethod( JavaClassCache::JM_FLINK_FILE_SYSTEM_RENAME_FILE); jboolean renamed = jniEnv->CallBooleanMethod(file_system_instance_, renameMethod.javaMethod, - srcPathInstance, targetPathInstance); - jniEnv->DeleteLocalRef(srcPathInstance); - jniEnv->DeleteLocalRef(targetPathInstance); + srcPathString, targetPathString); + jniEnv->DeleteLocalRef(srcPathString); + jniEnv->DeleteLocalRef(targetPathString); status = CurrentStatus([srcFilePath, targetFilePath]() { return std::string("Exception when RenameFile, src: ") @@ -895,30 +814,18 @@ IOStatus FlinkFileSystem::LinkFile(const std::string& src, JNIEnv* jniEnv = getJNIEnv(); std::string srcFilePath = ConstructPath(src); - // Construct src Path Instance - jobject srcPathInstance; - status = class_cache_->ConstructPathInstance(srcFilePath, &srcPathInstance); - if (!status.ok()) { - return status; - } + jstring srcPathString = jniEnv->NewStringUTF(srcFilePath.c_str()); std::string targetFilePath = ConstructPath(target); - // Construct target Path Instance - jobject targetPathInstance; - status = - class_cache_->ConstructPathInstance(targetFilePath, &targetPathInstance); - if (!status.ok()) { - jniEnv->DeleteLocalRef(srcPathInstance); - return status; - } + jstring targetPathString = jniEnv->NewStringUTF(targetFilePath.c_str()); JavaClassCache::JavaMethodContext linkMethod = class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_LINK_FILE); jint linked = jniEnv->CallIntMethod(file_system_instance_, linkMethod.javaMethod, - srcPathInstance, targetPathInstance); - jniEnv->DeleteLocalRef(srcPathInstance); - jniEnv->DeleteLocalRef(targetPathInstance); + srcPathString, targetPathString); + jniEnv->DeleteLocalRef(srcPathString); + jniEnv->DeleteLocalRef(targetPathString); status = CurrentStatus([srcFilePath, targetFilePath]() { return std::string("Exception when LinkFile, src: ") diff --git a/env/flink/jni_helper.cc b/env/flink/jni_helper.cc index 652780aec..8b57db6d6 100644 --- a/env/flink/jni_helper.cc +++ b/env/flink/jni_helper.cc @@ -48,17 +48,14 @@ IOStatus JavaClassCache::Create(JNIEnv* env, IOStatus JavaClassCache::Init() { // Set all class names - cached_java_classes_[CachedJavaClass::JC_URI].className = "java/net/URI"; cached_java_classes_[CachedJavaClass::JC_BYTE_BUFFER].className = "java/nio/ByteBuffer"; cached_java_classes_[CachedJavaClass::JC_THROWABLE].className = "java/lang/Throwable"; - cached_java_classes_[CachedJavaClass::JC_FLINK_PATH].className = - "org/apache/flink/core/fs/Path"; cached_java_classes_[CachedJavaClass::JC_FLINK_FILE_SYSTEM].className = - "org/apache/flink/state/forst/fs/ForStFlinkFileSystem"; + "org/apache/flink/state/forst/fs/StringifiedForStFileSystem"; cached_java_classes_[CachedJavaClass::JC_FLINK_FILE_STATUS].className = - "org/apache/flink/core/fs/FileStatus"; + "org/apache/flink/state/forst/fs/ForStFileStatus"; cached_java_classes_[CachedJavaClass::JC_FLINK_FS_INPUT_STREAM].className = "org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream"; cached_java_classes_[CachedJavaClass::JC_FLINK_FS_OUTPUT_STREAM].className = @@ -76,33 +73,13 @@ IOStatus JavaClassCache::Init() { } // Set all method names, signatures and class infos - cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_CONSTRUCTOR] - .javaClassAndName = cached_java_classes_[JC_FLINK_PATH]; - cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_CONSTRUCTOR].methodName = - ""; - cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_CONSTRUCTOR].signature = - "(Ljava/lang/String;)V"; - - cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_TO_STRING] - .javaClassAndName = cached_java_classes_[JC_FLINK_PATH]; - cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_TO_STRING].methodName = - "toString"; - cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_TO_STRING].signature = - "()Ljava/lang/String;"; - - cached_java_methods_[CachedJavaMethod::JM_FLINK_URI_CONSTRUCTOR] - .javaClassAndName = cached_java_classes_[JC_URI]; - cached_java_methods_[CachedJavaMethod::JM_FLINK_URI_CONSTRUCTOR].methodName = - ""; - cached_java_methods_[CachedJavaMethod::JM_FLINK_URI_CONSTRUCTOR].signature = - "(Ljava/lang/String;)V"; - cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET] .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET].methodName = "get"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET].signature = - "(Ljava/net/URI;)Lorg/apache/flink/core/fs/FileSystem;"; + "(Ljava/lang/String;)Lorg/apache/flink/state/forst/fs/" + "StringifiedForStFileSystem;"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET].isStatic = true; @@ -111,7 +88,7 @@ IOStatus JavaClassCache::Init() { cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_EXISTS] .methodName = "exists"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_EXISTS] - .signature = "(Lorg/apache/flink/core/fs/Path;)Z"; + .signature = "(Ljava/lang/String;)Z"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LIST_STATUS] .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; @@ -119,7 +96,7 @@ IOStatus JavaClassCache::Init() { .methodName = "listStatus"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LIST_STATUS] .signature = - "(Lorg/apache/flink/core/fs/Path;)[Lorg/apache/flink/core/fs/FileStatus;"; + "(Ljava/lang/String;)[Lorg/apache/flink/state/forst/fs/ForStFileStatus;"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET_FILE_STATUS] .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; @@ -127,44 +104,42 @@ IOStatus JavaClassCache::Init() { .methodName = "getFileStatus"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET_FILE_STATUS] .signature = - "(Lorg/apache/flink/core/fs/Path;)Lorg/apache/flink/core/fs/FileStatus;"; + "(Ljava/lang/String;)Lorg/apache/flink/state/forst/fs/ForStFileStatus;"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_DELETE] .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_DELETE] .methodName = "delete"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_DELETE] - .signature = "(Lorg/apache/flink/core/fs/Path;Z)Z"; + .signature = "(Ljava/lang/String;Z)Z"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_MKDIR] .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_MKDIR] .methodName = "mkdirs"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_MKDIR].signature = - "(Lorg/apache/flink/core/fs/Path;)Z"; + "(Ljava/lang/String;)Z"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_RENAME_FILE] .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_RENAME_FILE] .methodName = "rename"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_RENAME_FILE] - .signature = - "(Lorg/apache/flink/core/fs/Path;Lorg/apache/flink/core/fs/Path;)Z"; + .signature = "(Ljava/lang/String;Ljava/lang/String;)Z"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LINK_FILE] .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LINK_FILE] .methodName = "link"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LINK_FILE] - .signature = - "(Lorg/apache/flink/core/fs/Path;Lorg/apache/flink/core/fs/Path;)I"; + .signature = "(Ljava/lang/String;Ljava/lang/String;)I"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_OPEN] .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_OPEN].methodName = "open"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_OPEN].signature = - "(Lorg/apache/flink/core/fs/Path;)Lorg/apache/flink/state/forst/fs/" + "(Ljava/lang/String;)Lorg/apache/flink/state/forst/fs/" "ByteBufferReadableFSDataInputStream;"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_SEQ_READ] @@ -229,7 +204,7 @@ IOStatus JavaClassCache::Init() { .methodName = "create"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_CREATE] .signature = - "(Lorg/apache/flink/core/fs/Path;)Lorg/apache/flink/state/forst/fs/" + "(Ljava/lang/String;)Lorg/apache/flink/state/forst/fs/" "ByteBufferWritableFSDataOutputStream;"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_PATH] @@ -237,7 +212,7 @@ IOStatus JavaClassCache::Init() { cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_PATH] .methodName = "getPath"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_PATH] - .signature = "()Lorg/apache/flink/core/fs/Path;"; + .signature = "()Ljava/lang/String;"; cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_LEN] .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_STATUS]; @@ -315,29 +290,6 @@ JavaClassCache::JavaMethodContext JavaClassCache::GetJMethod( return cached_java_methods_[cachedJavaMethod]; } -IOStatus JavaClassCache::ConstructPathInstance(const std::string& file_path, - jobject* pathInstance) { - JNIEnv* jniEnv = getJNIEnv(); - JavaClassCache::JavaClassContext pathClass = - GetJClass(JavaClassCache::JC_FLINK_PATH); - JavaClassCache::JavaMethodContext pathConstructor = - GetJMethod(JavaClassCache::JM_FLINK_PATH_CONSTRUCTOR); - jstring pathString = jniEnv->NewStringUTF(file_path.c_str()); - jobject tempPathInstance = jniEnv->NewObject( - pathClass.javaClass, pathConstructor.javaMethod, pathString); - jniEnv->DeleteLocalRef(pathString); - if (tempPathInstance == nullptr) { - return CheckThenError(std::string("Exception when ConstructPathInstance, ") - .append(pathClass.ToString()) - .append(pathConstructor.ToString()) - .append(", args: Path(") - .append(file_path) - .append(")")); - } - *pathInstance = tempPathInstance; - return IOStatus::OK(); -} - IOStatus CurrentStatus( const std::function& exceptionMessageIfError) { JNIEnv* jniEnv = getJNIEnv(); diff --git a/env/flink/jni_helper.h b/env/flink/jni_helper.h index 19bc89371..a495a5b71 100644 --- a/env/flink/jni_helper.h +++ b/env/flink/jni_helper.h @@ -29,10 +29,8 @@ class JavaClassCache { public: // Frequently-used class type representing jclasses which will be cached. typedef enum { - JC_URI, JC_BYTE_BUFFER, JC_THROWABLE, - JC_FLINK_PATH, JC_FLINK_FILE_SYSTEM, JC_FLINK_FILE_STATUS, JC_FLINK_FS_INPUT_STREAM, @@ -42,9 +40,6 @@ class JavaClassCache { // Frequently-used method type representing jmethods which will be cached. typedef enum { - JM_FLINK_PATH_CONSTRUCTOR, - JM_FLINK_PATH_TO_STRING, - JM_FLINK_URI_CONSTRUCTOR, JM_FLINK_FILE_SYSTEM_GET, JM_FLINK_FILE_SYSTEM_EXISTS, JM_FLINK_FILE_SYSTEM_LIST_STATUS, @@ -113,11 +108,6 @@ class JavaClassCache { // Get JavaMethodContext by specific CachedJavaMethod. JavaMethodContext GetJMethod(CachedJavaMethod cachedJavaMethod); - // Construct Java Path Instance based on cached classes and method related to - // Path. - IOStatus ConstructPathInstance(const std::string& /*file_path*/, - jobject* /*pathInstance*/); - private: JNIEnv* jni_env_; JavaClassContext cached_java_classes_[CachedJavaClass::NUM_CACHED_CLASSES]; diff --git a/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ForStFileStatus.java b/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ForStFileStatus.java new file mode 100644 index 000000000..7fb238d7e --- /dev/null +++ b/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/ForStFileStatus.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs; + +import org.apache.flink.core.fs.FileStatus; + +/** + * A wrapper of {@link FileStatus} just for ForSt. It will delegate all the methods from {@link FileStatus} and provide a version of primitive types. This class is used by JNI. + */ +public class ForStFileStatus { + private final FileStatus fileStatus; + + public ForStFileStatus(FileStatus fileStatus) { + this.fileStatus = fileStatus; + } + + public long getLen() { + return fileStatus.getLen(); + } + + public long getBlockSize() { + return fileStatus.getBlockSize(); + } + + public short getReplication() { + return fileStatus.getReplication(); + } + + public long getModificationTime() { + return fileStatus.getModificationTime(); + } + + public long getAccessTime() { + return fileStatus.getAccessTime(); + } + + public boolean isDir() { + return fileStatus.isDir(); + } + + public String getPath() { + return fileStatus.getPath().toString(); + } +} diff --git a/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/StringifiedForStFileSystem.java b/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/StringifiedForStFileSystem.java new file mode 100644 index 000000000..345581c20 --- /dev/null +++ b/java/flinktestmock/src/main/java/org/apache/flink/state/forst/fs/StringifiedForStFileSystem.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import org.apache.flink.core.fs.Path; + +/** + * A {@link ForStFlinkFileSystem} stringifies all the parameters of all methods. + */ +public class StringifiedForStFileSystem { + private ForStFlinkFileSystem fileSystem; + + public StringifiedForStFileSystem(ForStFlinkFileSystem fileSystem) { + this.fileSystem = fileSystem; + } + + public static StringifiedForStFileSystem get(String uri) throws IOException { + return new StringifiedForStFileSystem(ForStFlinkFileSystem.get(URI.create(uri))); + } + + public boolean exists(final String path) throws IOException { + return fileSystem.exists(new Path(path)); + } + + public ForStFileStatus getFileStatus(String path) throws IOException { + return new ForStFileStatus(fileSystem.getFileStatus(new Path(path))); + } + + public ForStFileStatus[] listStatus(String path) throws IOException { + return Arrays.stream(fileSystem.listStatus(new Path(path))) + .map(ForStFileStatus::new) + .toArray(ForStFileStatus[]::new); + } + + public boolean delete(String path, boolean recursive) throws IOException { + return fileSystem.delete(new Path(path), recursive); + } + + public boolean mkdirs(String path) throws IOException { + return fileSystem.mkdirs(new Path(path)); + } + + public boolean rename(String src, String dst) throws IOException { + return fileSystem.rename(new Path(src), new Path(dst)); + } + + public ByteBufferReadableFSDataInputStream open(String path) throws IOException { + return fileSystem.open(new Path(path)); + } + + public ByteBufferWritableFSDataOutputStream create(String path) throws IOException { + return fileSystem.create(new Path(path)); + } + + public int link(String src, String dst) throws IOException { + return fileSystem.link(new Path(src), new Path(dst)); + } +}