Skip to content

Commit

Permalink
[GLUTEN-7243][VL] Fix hanging by cross-task spilling (#7479)
Browse files Browse the repository at this point in the history
Closes #7243
  • Loading branch information
zhztheplayer authored Oct 14, 2024
1 parent d6e048a commit 20f2d20
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
}

override def injectWriteFilesTempPath(path: String, fileName: String): Unit = {
val transKernel = NativePlanEvaluator.create()
transKernel.injectWriteFilesTempPath(path)
NativePlanEvaluator.injectWriteFilesTempPath(path)
}

/** Generate Iterator[ColumnarBatch] for first stage. */
Expand Down
7 changes: 7 additions & 0 deletions cpp/core/compute/Runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,11 @@ void Runtime::release(Runtime* runtime) {
delete runtime;
}

std::optional<std::string>* Runtime::localWriteFilesTempPath() {
// This is thread-local to conform to Java side ColumnarWriteFilesExec's design.
// FIXME: Pass the path through relevant member functions.
static thread_local std::optional<std::string> path;
return &path;
}

} // namespace gluten
4 changes: 1 addition & 3 deletions cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
std::unique_ptr<AllocationListener> listener,
const std::unordered_map<std::string, std::string>& sessionConf = {});
static void release(Runtime*);
static std::optional<std::string>* localWriteFilesTempPath();

Runtime(std::shared_ptr<MemoryManager> memoryManager, const std::unordered_map<std::string, std::string>& confMap)
: memoryManager_(memoryManager), confMap_(confMap) {}
Expand All @@ -74,8 +75,6 @@ class Runtime : public std::enable_shared_from_this<Runtime> {

virtual std::string planString(bool details, const std::unordered_map<std::string, std::string>& sessionConf) = 0;

virtual void injectWriteFilesTempPath(const std::string& path) = 0;

// Just for benchmark
::substrait::Plan& getPlan() {
return substraitPlan_;
Expand Down Expand Up @@ -134,7 +133,6 @@ class Runtime : public std::enable_shared_from_this<Runtime> {

::substrait::Plan substraitPlan_;
std::vector<::substrait::ReadRel_LocalFiles> localFiles_;
std::optional<std::string> writeFilesTempPath_;
SparkTaskInfo taskInfo_;
};
} // namespace gluten
7 changes: 2 additions & 5 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,16 +311,13 @@ JNIEXPORT jstring JNICALL Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrap

JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_injectWriteFilesTempPath( // NOLINT
JNIEnv* env,
jobject wrapper,
jclass,
jbyteArray path) {
JNI_METHOD_START

auto len = env->GetArrayLength(path);
auto safeArray = gluten::getByteArrayElementsSafe(env, path);
std::string pathStr(reinterpret_cast<char*>(safeArray.elems()), len);
auto ctx = gluten::getRuntime(env, wrapper);
ctx->injectWriteFilesTempPath(pathStr);

*gluten::Runtime::localWriteFilesTempPath() = pathStr;
JNI_METHOD_END()
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ auto BM_Generic = [](::benchmark::State& state,
return static_cast<FileReaderIterator*>(iter->getInputIter());
});
}
runtime->injectWriteFilesTempPath(FLAGS_write_path);
*Runtime::localWriteFilesTempPath() = FLAGS_write_path;
runtime->parsePlan(reinterpret_cast<uint8_t*>(plan.data()), plan.size(), std::nullopt);
for (auto& split : splits) {
runtime->parseSplitInfo(reinterpret_cast<uint8_t*>(split.data()), split.size(), std::nullopt);
Expand Down
7 changes: 2 additions & 5 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ std::string VeloxRuntime::planString(bool details, const std::unordered_map<std:
return veloxPlan->toString(details, true);
}

void VeloxRuntime::injectWriteFilesTempPath(const std::string& path) {
writeFilesTempPath_ = path;
}

VeloxMemoryManager* VeloxRuntime::memoryManager() {
return vmm_;
}
Expand All @@ -142,7 +138,8 @@ std::shared_ptr<ResultIterator> VeloxRuntime::createResultIterator(
const std::unordered_map<std::string, std::string>& sessionConf) {
LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" << printConfig(confMap_);

VeloxPlanConverter veloxPlanConverter(inputs, vmm_->getLeafMemoryPool().get(), sessionConf, writeFilesTempPath_);
VeloxPlanConverter veloxPlanConverter(
inputs, vmm_->getLeafMemoryPool().get(), sessionConf, *localWriteFilesTempPath());
veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, std::move(localFiles_));

// Scan node can be required.
Expand Down
2 changes: 0 additions & 2 deletions cpp/velox/compute/VeloxRuntime.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ class VeloxRuntime final : public Runtime {

std::string planString(bool details, const std::unordered_map<std::string, std::string>& sessionConf) override;

void injectWriteFilesTempPath(const std::string& path) override;

void dumpConf(const std::string& path) override;

std::shared_ptr<const facebook::velox::core::PlanNode> getVeloxPlan() {
Expand Down
3 changes: 0 additions & 3 deletions cpp/velox/tests/RuntimeTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ class DummyRuntime final : public Runtime {
std::string planString(bool details, const std::unordered_map<std::string, std::string>& sessionConf) override {
throw GlutenException("Not yet implemented");
}
void injectWriteFilesTempPath(const std::string& path) override {
throw GlutenException("Not yet implemented");
}

void dumpConf(const std::string& path) override {
throw GlutenException("Not yet implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class NativePlanEvaluator {
private static final AtomicInteger id = new AtomicInteger(0);
private final Runtime runtime =
Runtimes.contextInstance(String.format("NativePlanEvaluator-%d", id.getAndIncrement()));

private final Runtime runtime = Runtimes.contextInstance("WholeStageIterator");
private final PlanEvaluatorJniWrapper jniWrapper;

private NativePlanEvaluator() {
Expand All @@ -46,8 +49,8 @@ public NativePlanValidationInfo doNativeValidateWithFailureReason(byte[] subPlan
return jniWrapper.nativeValidateWithFailureReason(subPlan);
}

public void injectWriteFilesTempPath(String path) {
jniWrapper.injectWriteFilesTempPath(path.getBytes(StandardCharsets.UTF_8));
public static void injectWriteFilesTempPath(String path) {
PlanEvaluatorJniWrapper.injectWriteFilesTempPath(path.getBytes(StandardCharsets.UTF_8));
}

// Used by WholeStageTransform to create the native computing pipeline and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public long rtHandle() {
return runtime.getHandle();
}

public static native void injectWriteFilesTempPath(byte[] path);

/**
* Validate the Substrait plan in native compute engine.
*
Expand All @@ -51,8 +53,6 @@ public long rtHandle() {

public native String nativePlanString(byte[] substraitPlan, Boolean details);

public native void injectWriteFilesTempPath(byte[] path);

/**
* Create a native compute kernel and return a columnar result iterator.
*
Expand Down

0 comments on commit 20f2d20

Please sign in to comment.