From 40206b8d90e2e80dd0cacb8f2a3aaacc49371ef3 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 9 Aug 2024 10:00:32 -0700 Subject: [PATCH 1/6] Add PosixPluginFrontendSpec --- .../frontend/PosixPluginFrontendSpec.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala diff --git a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala new file mode 100644 index 0000000..2c498e5 --- /dev/null +++ b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala @@ -0,0 +1,51 @@ +package protocbridge.frontend + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.must.Matchers +import protocbridge.{ExtraEnv, ProtocCodeGenerator} + +import java.io.ByteArrayOutputStream +import scala.sys.process.ProcessIO +import scala.util.Random + +class PosixPluginFrontendSpec extends AnyFlatSpec with Matchers { + if (!PluginFrontend.isWindows) { + it must "execute a program that forwards input and output to given stream" in { + val random = new Random() + val toSend = Array.fill(123)(random.nextInt(256).toByte) + val toReceive = Array.fill(456)(random.nextInt(256).toByte) + val env = new ExtraEnv(secondaryOutputDir = "tmp") + + val fakeGenerator = new ProtocCodeGenerator { + override def run(request: Array[Byte]): Array[Byte] = { + request mustBe (toSend ++ env.toByteArrayAsField) + toReceive + } + } + val (path, state) = PosixPluginFrontend.prepare( + fakeGenerator, + env + ) + val actualOutput = new ByteArrayOutputStream() + val process = sys.process + .Process(path.toAbsolutePath.toString) + .run(new ProcessIO(writeInput => { + writeInput.write(toSend) + writeInput.close() + }, processOutput => { + val buffer = new Array[Byte](4096) + var bytesRead = 0 + while (bytesRead != -1) { + bytesRead = processOutput.read(buffer) + if (bytesRead != -1) { + actualOutput.write(buffer, 0, bytesRead) + } + } + processOutput.close() + }, _.close())) + process.exitValue() + actualOutput.toByteArray mustBe toReceive + PosixPluginFrontend.cleanup(state) + } + } +} From 3041aed5b04c30aeb96574eeb294287fd9ddfb05 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 9 Aug 2024 10:00:38 -0700 Subject: [PATCH 2/6] Use IOUtils in PosixPluginFrontendSpec --- .../frontend/PosixPluginFrontendSpec.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala index 2c498e5..e86f922 100644 --- a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala @@ -1,5 +1,6 @@ package protocbridge.frontend +import org.apache.commons.io.IOUtils import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.must.Matchers import protocbridge.{ExtraEnv, ProtocCodeGenerator} @@ -33,14 +34,7 @@ class PosixPluginFrontendSpec extends AnyFlatSpec with Matchers { writeInput.write(toSend) writeInput.close() }, processOutput => { - val buffer = new Array[Byte](4096) - var bytesRead = 0 - while (bytesRead != -1) { - bytesRead = processOutput.read(buffer) - if (bytesRead != -1) { - actualOutput.write(buffer, 0, bytesRead) - } - } + IOUtils.copy(processOutput, actualOutput) processOutput.close() }, _.close())) process.exitValue() From f41355446f65d3208b0378aa14b94dfd2cf628fc Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 9 Aug 2024 10:02:56 -0700 Subject: [PATCH 3/6] Stress test PosixPluginFrontendSpec --- .../frontend/PosixPluginFrontendSpec.scala | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala index e86f922..0bdbd1e 100644 --- a/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala +++ b/bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala @@ -23,23 +23,30 @@ class PosixPluginFrontendSpec extends AnyFlatSpec with Matchers { toReceive } } - val (path, state) = PosixPluginFrontend.prepare( - fakeGenerator, - env - ) - val actualOutput = new ByteArrayOutputStream() - val process = sys.process - .Process(path.toAbsolutePath.toString) - .run(new ProcessIO(writeInput => { - writeInput.write(toSend) - writeInput.close() - }, processOutput => { - IOUtils.copy(processOutput, actualOutput) - processOutput.close() - }, _.close())) - process.exitValue() - actualOutput.toByteArray mustBe toReceive - PosixPluginFrontend.cleanup(state) + + // Repeat 10,000 times since named pipes on macOS are flaky. + val repeatCount = 10000 + for (i <- 1 to repeatCount) { + if (i % 100 == 1) println(s"Running iteration $i of $repeatCount") + + val (path, state) = PosixPluginFrontend.prepare( + fakeGenerator, + env + ) + val actualOutput = new ByteArrayOutputStream() + val process = sys.process + .Process(path.toAbsolutePath.toString) + .run(new ProcessIO(writeInput => { + writeInput.write(toSend) + writeInput.close() + }, processOutput => { + IOUtils.copy(processOutput, actualOutput) + processOutput.close() + }, _.close())) + process.exitValue() + actualOutput.toByteArray mustBe toReceive + PosixPluginFrontend.cleanup(state) + } } } } From 86d9d17488931a8aef3e4534f07fedf2ef32617c Mon Sep 17 00:00:00 2001 From: Bell Le Date: Mon, 22 Jul 2024 15:28:32 -0700 Subject: [PATCH 4/6] Log all failures in Future --- .../protocbridge/frontend/PluginFrontend.scala | 18 ++++++++++-------- .../frontend/PosixPluginFrontend.scala | 8 ++++++++ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala index 7415f06..0824a44 100644 --- a/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala @@ -47,13 +47,7 @@ object PluginFrontend { gen: ProtocCodeGenerator, request: Array[Byte] ): Array[Byte] = { - Try { - gen.run(request) - }.recover { case throwable => - createCodeGeneratorResponseWithError( - throwable.toString + "\n" + getStackTrace(throwable) - ) - }.get + gen.run(request) } def createCodeGeneratorResponseWithError(error: String): Array[Byte] = { @@ -116,9 +110,17 @@ object PluginFrontend { gen: ProtocCodeGenerator, fsin: InputStream, env: ExtraEnv - ): Array[Byte] = { + ): Array[Byte] = try { + System.err.println("readInputStreamToByteArrayWithEnv...") val bytes = readInputStreamToByteArrayWithEnv(fsin, env) + System.err.println("runWithBytes...") runWithBytes(gen, bytes) + } catch { + case throwable: Throwable => + System.err.println("createCodeGeneratorResponseWithError...") + createCodeGeneratorResponseWithError( + throwable.toString + "\n" + getStackTrace(throwable) + ) } def createTempFile(extension: String, content: String): Path = { diff --git a/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala index 5f70120..ac5188b 100644 --- a/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala @@ -36,13 +36,21 @@ object PosixPluginFrontend extends PluginFrontend { Future { blocking { + System.err.println("Files.newInputStream...") val fsin = Files.newInputStream(inputPipe) + System.err.println("PluginFrontend.runWithInputStream...") val response = PluginFrontend.runWithInputStream(plugin, fsin, env) + System.err.println("fsin.close...") fsin.close() + System.err.println("Files.newOutputStream...") val fsout = Files.newOutputStream(outputPipe) + System.err.println("fsout.write...") fsout.write(response) + System.err.println("fsout.close...") fsout.close() + + System.err.println("blocking done.") } } (sh, InternalState(inputPipe, outputPipe, tempDirPath, sh)) From f828ab64bb57763169280bc987f552f248059f81 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Tue, 23 Jul 2024 17:26:15 -0700 Subject: [PATCH 5/6] More logging --- .../frontend/PluginFrontend.scala | 23 +++++---- .../frontend/PosixPluginFrontend.scala | 47 ++++++++++++++----- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala index 0824a44..0d13632 100644 --- a/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala @@ -47,7 +47,16 @@ object PluginFrontend { gen: ProtocCodeGenerator, request: Array[Byte] ): Array[Byte] = { - gen.run(request) + // Use try-catch to handle all Throwable including OutOfMemoryError, StackOverflowError, etc. + try { + gen.run(request) + } catch { + case throwable: Throwable => + System.err.println("createCodeGeneratorResponseWithError...") + createCodeGeneratorResponseWithError( + throwable.toString + "\n" + getStackTrace(throwable) + ) + } } def createCodeGeneratorResponseWithError(error: String): Array[Byte] = { @@ -110,17 +119,11 @@ object PluginFrontend { gen: ProtocCodeGenerator, fsin: InputStream, env: ExtraEnv - ): Array[Byte] = try { - System.err.println("readInputStreamToByteArrayWithEnv...") + ): Array[Byte] = { +// System.err.println("readInputStreamToByteArrayWithEnv...") val bytes = readInputStreamToByteArrayWithEnv(fsin, env) - System.err.println("runWithBytes...") +// System.err.println("runWithBytes...") runWithBytes(gen, bytes) - } catch { - case throwable: Throwable => - System.err.println("createCodeGeneratorResponseWithError...") - createCodeGeneratorResponseWithError( - throwable.toString + "\n" + getStackTrace(throwable) - ) } def createTempFile(extension: String, content: String): Path = { diff --git a/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala b/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala index ac5188b..90d22a6 100644 --- a/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala +++ b/bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala @@ -36,21 +36,42 @@ object PosixPluginFrontend extends PluginFrontend { Future { blocking { - System.err.println("Files.newInputStream...") - val fsin = Files.newInputStream(inputPipe) - System.err.println("PluginFrontend.runWithInputStream...") - val response = PluginFrontend.runWithInputStream(plugin, fsin, env) - System.err.println("fsin.close...") - fsin.close() + try { +// System.err.println("Files.newInputStream...") + val fsin = Files.newInputStream(inputPipe) +// System.err.println("PluginFrontend.runWithInputStream...") + val response = PluginFrontend.runWithInputStream(plugin, fsin, env) +// System.err.println("fsin.close...") + fsin.close() - System.err.println("Files.newOutputStream...") - val fsout = Files.newOutputStream(outputPipe) - System.err.println("fsout.write...") - fsout.write(response) - System.err.println("fsout.close...") - fsout.close() +// System.err.println("Files.newOutputStream...") + val fsout = Files.newOutputStream(outputPipe) +// System.err.println("fsout.write...") + fsout.write(response) +// System.err.println("fsout.close...") + fsout.close() - System.err.println("blocking done.") +// System.err.println("blocking done.") + } catch { + case e: Throwable => + // Handles rare exceptions not already gracefully handled in `runWithBytes`. + // Such exceptions aren't converted to `CodeGeneratorResponse` + // because `fsin` might not be fully consumed, + // therefore the plugin shell script might hang on `inputPipe`, + // and never consume `CodeGeneratorResponse`. + System.err.println("Exception occurred in PluginFrontend outside runWithBytes") + e.printStackTrace(System.err) + // Force an exit of the program. + // This is because the plugin shell script might hang on `inputPipe`, + // due to `fsin` not fully consumed. + // Or it might hang on `outputPipe`, due to `fsout` not closed. + // We can't simply close `fsout` here either, + // because `Files.newOutputStream(outputPipe)` will hang + // if `outputPipe` is not yet opened by the plugin shell script for reading. + // Therefore, the program might be stuck waiting for protoc, + // which in turn is waiting for the plugin shell script. + sys.exit(1) + } } } (sh, InternalState(inputPipe, outputPipe, tempDirPath, sh)) From eea0ae0d735760f71004a795c46f06b8b92b7321 Mon Sep 17 00:00:00 2001 From: Bell Le Date: Fri, 9 Aug 2024 10:11:25 -0700 Subject: [PATCH 6/6] Add pipe_stress_test.sh --- dd.py | 24 ++++++++++++ pipe_stress_test.sh | 96 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+) create mode 100644 dd.py create mode 100755 pipe_stress_test.sh diff --git a/dd.py b/dd.py new file mode 100644 index 0000000..ae0ee30 --- /dev/null +++ b/dd.py @@ -0,0 +1,24 @@ +import sys +import time + +total_bytes = 0 +input_file = sys.argv[1] if len(sys.argv) > 1 else "/dev/stdin" +output_file = sys.argv[2] if len(sys.argv) > 2 else "/dev/stdout" + +sys.stderr.write(f"[{time.time_ns()}][dd.py] Opening input stream\n") +with open(input_file, "rb") as input_stream: + sys.stderr.write(f"[{time.time_ns()}][dd.py] Opened input stream\n") + + sys.stderr.write(f"[{time.time_ns()}][dd.py] Opening output stream\n") + with open(output_file, "wb") as output_stream: + sys.stderr.write(f"[{time.time_ns()}][dd.py] Opened output stream\n") + + while True: + chunk = input_stream.read(4096) + if not chunk: + break + output_stream.write(chunk) + total_bytes += len(chunk) + sys.stderr.write(f"[{time.time_ns()}][dd.py] Transferred {len(chunk)} bytes, total {total_bytes}\n") + +sys.stderr.write(f"[{time.time_ns()}][dd.py] Transferred total {total_bytes} bytes\n") diff --git a/pipe_stress_test.sh b/pipe_stress_test.sh new file mode 100755 index 0000000..4ec733a --- /dev/null +++ b/pipe_stress_test.sh @@ -0,0 +1,96 @@ +#!/usr/bin/env bash + +BYTE_LENGTH="$1" +READER_MODE="${2:-dd}" +WRITER_MODE="${3:-dd}" + +TEST_FILE="$(mktemp -t protopipe)" +dd if=/dev/urandom of="$TEST_FILE" bs=1 count="$BYTE_LENGTH" 2>/dev/null + +test_pipe() { + # Create a unique temporary directory for the pipe + PIPE_DIR=$(mktemp -d -t protopipe) + PIPE_PATH="$PIPE_DIR/output" + PIPE_DATA_PATH="$PIPE_DIR/pipe_data" + + # Create the named pipe + mkfifo "$PIPE_PATH" + echo "Created named pipe at $PIPE_PATH" + + # Start monitoring the pipe using fs_usage in the background + # sudo fs_usage -w | grep "$PIPE_PATH" & + # MONITOR_PID=$! + # echo "Started monitoring the pipe $PIPE_PATH (PID: $MONITOR_PID)" + + # Start dumping random bytes to the pipe in the background + if [[ "$READER_MODE" == "dd" ]]; then + (dd if="$TEST_FILE" of="$PIPE_PATH" 2>/dev/null && echo "Completed dumping random bytes to the pipe") & + elif [[ "$READER_MODE" == "dd.py" ]]; then + (python3 "$(dirname "$0")"/dd.py "$TEST_FILE" "$PIPE_PATH" && echo "Completed dumping random bytes to the pipe") & + elif [[ "$READER_MODE" == "cat" ]]; then + (cat "$TEST_FILE" > "$PIPE_PATH" && echo "Completed dumping random bytes to the pipe") & + else + echo "Invalid reader mode: $READER_MODE" + exit 1 + fi + DUMP_PID=$! + echo "Started dumping random bytes to the pipe (PID: $DUMP_PID)" + + # Randomize the sleep duration + SLEEP_DURATION=$((RANDOM % 100)) + echo "Sleeping for: $SLEEP_DURATION milliseconds" + sleep "$(echo "scale=3; $SLEEP_DURATION/1000" | bc)" + + # Start a process to consume the data from the pipe + if [[ "$WRITER_MODE" == "dd" ]]; then + (dd if="$PIPE_PATH" of="$PIPE_DATA_PATH" 2>/dev/null && echo "Completed consuming random bytes from the pipe") & + elif [[ "$WRITER_MODE" == "dd.py" ]]; then + (python3 "$(dirname "$0")"/dd.py "$PIPE_PATH" "$PIPE_DATA_PATH" && echo "Completed consuming random bytes from the pipe") & + elif [[ "$WRITER_MODE" == "cat" ]]; then + (cat "$PIPE_PATH" > "$PIPE_DATA_PATH" && echo "Completed consuming random bytes from the pipe") & + else + echo "Invalid writer mode: $WRITER_MODE" + exit 1 + fi + CONSUME_PID=$! + echo "Started consuming data from the pipe (PID: $CONSUME_PID)" + + # Ensure the dumping process is killed + wait $DUMP_PID 2>/dev/null + echo "The dumping process has stopped (PID: $DUMP_PID)" + + # Ensure the consuming process is killed + wait $CONSUME_PID 2>/dev/null + echo "The consuming process has stopped (PID: $CONSUME_PID)" + + # Stop the monitoring + # kill $MONITOR_PID 2>/dev/null + # wait $MONITOR_PID 2>/dev/null + # echo "Stopped monitoring the pipe (PID: $MONITOR_PID)" + + # Check the size of the data read from the pipe + DATA_SIZE=$(wc -c < "$PIPE_DATA_PATH") + if [ "$DATA_SIZE" -ne "$BYTE_LENGTH" ]; then + echo "Error: Expected $BYTE_LENGTH bytes, but read $DATA_SIZE bytes" + exit 1 + else + echo "Successfully read $BYTE_LENGTH bytes from the pipe" + fi + + # Remove the pipe + rm "$PIPE_PATH" + rm "$PIPE_DATA_PATH" + + # Remove the temporary directory + rmdir "$PIPE_DIR" +} + +# Kill existing fs_usage instances +# sudo pkill fs_usage + +# Repeat the process +counter=0; +while test_pipe; do + ((counter++)); echo "Iterations completed: $counter"; +done +echo "Command failed after $counter successful iterations."