Skip to content

Commit

Permalink
Handle all failures in Future
Browse files Browse the repository at this point in the history
  • Loading branch information
bell-db committed Aug 20, 2024
1 parent 34606e3 commit 07ee8a7
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 24 deletions.
18 changes: 10 additions & 8 deletions bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,7 @@ object PluginFrontend {
gen: ProtocCodeGenerator,
request: Array[Byte]
): Array[Byte] = {
Try {
gen.run(request)
}.recover { case throwable =>
createCodeGeneratorResponseWithError(
throwable.toString + "\n" + getStackTrace(throwable)
)
}.get
gen.run(request)
}

def createCodeGeneratorResponseWithError(error: String): Array[Byte] = {
Expand Down Expand Up @@ -116,9 +110,17 @@ object PluginFrontend {
gen: ProtocCodeGenerator,
fsin: InputStream,
env: ExtraEnv
): Array[Byte] = {
): Array[Byte] = try {
val bytes = readInputStreamToByteArrayWithEnv(fsin, env)
runWithBytes(gen, bytes)
} catch {
// This covers all Throwable including OutOfMemoryError, StackOverflowError, etc.
// We need to make a best effort to return a response to protoc,
// otherwise protoc can hang indefinitely.
case throwable: Throwable =>
createCodeGeneratorResponseWithError(
throwable.toString + "\n" + getStackTrace(throwable)
)
}

def createTempFile(extension: String, content: String): Path = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ object PosixPluginFrontend extends PluginFrontend {
val response = PluginFrontend.runWithInputStream(plugin, fsin, env)
fsin.close()

// Note that the output pipe must be opened after the input pipe is consumed.
// Otherwise, there might be a deadlock that
// - The shell script is stuck writing to the input pipe (which has a full buffer),
// and doesn't open the write end of the output pipe.
// - This thread is stuck waiting for the write end of the output pipe to be opened.
val fsout = Files.newOutputStream(outputPipe)
fsout.write(response)
fsout.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,21 @@ import scala.util.Random

class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers {

protected def testPluginFrontend(frontend: PluginFrontend): Array[Byte] = {
val random = new Random()
val toSend = Array.fill(123)(random.nextInt(256).toByte)
val toReceive = Array.fill(456)(random.nextInt(256).toByte)
val env = new ExtraEnv(secondaryOutputDir = "tmp")

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
request mustBe (toSend ++ env.toByteArrayAsField)
toReceive
}
}
protected def testPluginFrontend(
frontend: PluginFrontend,
generator: ProtocCodeGenerator,
env: ExtraEnv,
request: Array[Byte]
): Array[Byte] = {
val (path, state) = frontend.prepare(
fakeGenerator,
generator,
env
)
val actualOutput = new ByteArrayOutputStream()
val process = sys.process
.Process(path.toAbsolutePath.toString)
.run(new ProcessIO(writeInput => {
writeInput.write(toSend)
writeInput.write(request)
writeInput.close()
}, processOutput => {
val buffer = new Array[Byte](4096)
Expand All @@ -48,4 +42,34 @@ class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers {
frontend.cleanup(state)
actualOutput.toByteArray
}

protected def testSuccess(frontend: PluginFrontend): Unit = {
val random = new Random()
val toSend = Array.fill(123)(random.nextInt(256).toByte)
val toReceive = Array.fill(456)(random.nextInt(256).toByte)
val env = new ExtraEnv(secondaryOutputDir = "tmp")

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
request mustBe (toSend ++ env.toByteArrayAsField)
toReceive
}
}
val response = testPluginFrontend(frontend, fakeGenerator, env, toSend)
response mustBe toReceive
}

protected def testFailure(frontend: PluginFrontend): Unit = {
val random = new Random()
val toSend = Array.fill(123)(random.nextInt(256).toByte)
val env = new ExtraEnv(secondaryOutputDir = "tmp")

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
throw new OutOfMemoryError("test error")
}
}
val response = testPluginFrontend(frontend, fakeGenerator, env, toSend)
response.length must be > 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package protocbridge.frontend
class PosixPluginFrontendSpec extends OsSpecificFrontendSpec {
if (!PluginFrontend.isWindows) {
it must "execute a program that forwards input and output to given stream" in {
testPluginFrontend(PosixPluginFrontend)
testSuccess(PosixPluginFrontend)
}

it must "not hang if there is an OOM in generator" in {
testFailure(PosixPluginFrontend)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package protocbridge.frontend
class WindowsPluginFrontendSpec extends OsSpecificFrontendSpec {
if (PluginFrontend.isWindows) {
it must "execute a program that forwards input and output to given stream" in {
testPluginFrontend(WindowsPluginFrontend)
testSuccess(WindowsPluginFrontend)
}

it must "not hang if there is an OOM in generator" in {
testFailure(WindowsPluginFrontend)
}
}
}

0 comments on commit 07ee8a7

Please sign in to comment.