Skip to content

Commit

Permalink
Handle all failures in Future
Browse files Browse the repository at this point in the history
  • Loading branch information
bell-db committed Aug 16, 2024
1 parent 3041aed commit 6f0dfdb
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 75 deletions.
18 changes: 10 additions & 8 deletions bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,7 @@ object PluginFrontend {
gen: ProtocCodeGenerator,
request: Array[Byte]
): Array[Byte] = {
Try {
gen.run(request)
}.recover { case throwable =>
createCodeGeneratorResponseWithError(
throwable.toString + "\n" + getStackTrace(throwable)
)
}.get
gen.run(request)
}

def createCodeGeneratorResponseWithError(error: String): Array[Byte] = {
Expand Down Expand Up @@ -116,9 +110,17 @@ object PluginFrontend {
gen: ProtocCodeGenerator,
fsin: InputStream,
env: ExtraEnv
): Array[Byte] = {
): Array[Byte] = try {
val bytes = readInputStreamToByteArrayWithEnv(fsin, env)
runWithBytes(gen, bytes)
} catch {
// This covers all Throwable including OutOfMemoryError, StackOverflowError, etc.
// We need to make a best effort to return a response to protoc,
// otherwise protoc can hang indefinitely.
case throwable: Throwable =>
createCodeGeneratorResponseWithError(
throwable.toString + "\n" + getStackTrace(throwable)
)
}

def createTempFile(extension: String, content: String): Path = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ object PosixPluginFrontend extends PluginFrontend {
val response = PluginFrontend.runWithInputStream(plugin, fsin, env)
fsin.close()

// Note that the output pipe must be opened after the input pipe is closed.
// Otherwise, there might be a deadlock if the shell script is stuck in input pipe and
// doesn't open the other end of the output pipe.
val fsout = Files.newOutputStream(outputPipe)
fsout.write(response)
fsout.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package protocbridge.frontend

import org.apache.commons.io.IOUtils
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.must.Matchers
import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.io.ByteArrayOutputStream
import scala.sys.process.ProcessIO
import scala.util.Random

class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers {

protected def testPluginFrontend(
frontend: PluginFrontend,
generator: ProtocCodeGenerator,
env: ExtraEnv,
request: Array[Byte]
): Array[Byte] = {
val (path, state) = frontend.prepare(
generator,
env
)
val actualOutput = new ByteArrayOutputStream()
val process = sys.process
.Process(path.toAbsolutePath.toString)
.run(new ProcessIO(writeInput => {
writeInput.write(request)
writeInput.close()
}, processOutput => {
IOUtils.copy(processOutput, actualOutput)
processOutput.close()
}, _.close()))
process.exitValue()
frontend.cleanup(state)
actualOutput.toByteArray
}

protected def testSuccess(frontend: PluginFrontend): Unit = {
val random = new Random()
val toSend = Array.fill(123)(random.nextInt(256).toByte)
val toReceive = Array.fill(456)(random.nextInt(256).toByte)
val env = new ExtraEnv(secondaryOutputDir = "tmp")

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
request mustBe (toSend ++ env.toByteArrayAsField)
toReceive
}
}
val response = testPluginFrontend(frontend, fakeGenerator, env, toSend)
response mustBe toReceive
}

protected def testFailure(frontend: PluginFrontend): Unit = {
val random = new Random()
val toSend = Array.fill(123)(random.nextInt(256).toByte)
val env = new ExtraEnv(secondaryOutputDir = "tmp")

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
throw new OutOfMemoryError("test error")
}
}
val response = testPluginFrontend(frontend, fakeGenerator, env, toSend)
response.length must be > 0
}
}
Original file line number Diff line number Diff line change
@@ -1,45 +1,13 @@
package protocbridge.frontend

import org.apache.commons.io.IOUtils
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.must.Matchers
import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.io.ByteArrayOutputStream
import scala.sys.process.ProcessIO
import scala.util.Random

class PosixPluginFrontendSpec extends AnyFlatSpec with Matchers {
class PosixPluginFrontendSpec extends OsSpecificFrontendSpec {
if (!PluginFrontend.isWindows) {
it must "execute a program that forwards input and output to given stream" in {
val random = new Random()
val toSend = Array.fill(123)(random.nextInt(256).toByte)
val toReceive = Array.fill(456)(random.nextInt(256).toByte)
val env = new ExtraEnv(secondaryOutputDir = "tmp")
testSuccess(PosixPluginFrontend)
}

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
request mustBe (toSend ++ env.toByteArrayAsField)
toReceive
}
}
val (path, state) = PosixPluginFrontend.prepare(
fakeGenerator,
env
)
val actualOutput = new ByteArrayOutputStream()
val process = sys.process
.Process(path.toAbsolutePath.toString)
.run(new ProcessIO(writeInput => {
writeInput.write(toSend)
writeInput.close()
}, processOutput => {
IOUtils.copy(processOutput, actualOutput)
processOutput.close()
}, _.close()))
process.exitValue()
actualOutput.toByteArray mustBe toReceive
PosixPluginFrontend.cleanup(state)
it must "not hang if there is an error in generator" in {
testFailure(PosixPluginFrontend)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,38 +1,13 @@
package protocbridge.frontend

import java.io.ByteArrayInputStream

import protocbridge.{ProtocCodeGenerator, ExtraEnv}

import scala.sys.process.ProcessLogger
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.must.Matchers

class WindowsPluginFrontendSpec extends AnyFlatSpec with Matchers {
class WindowsPluginFrontendSpec extends OsSpecificFrontendSpec {
if (PluginFrontend.isWindows) {
it must "execute a program that forwards input and output to given stream" in {
val toSend = "ping"
val toReceive = "pong"
val env = new ExtraEnv(secondaryOutputDir = "tmp")
testSuccess(WindowsPluginFrontend)
}

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
request mustBe (toSend.getBytes ++ env.toByteArrayAsField)
toReceive.getBytes
}
}
val (path, state) = WindowsPluginFrontend.prepare(
fakeGenerator,
env
)
val actualOutput = scala.collection.mutable.Buffer.empty[String]
val process = sys.process
.Process(path.toAbsolutePath.toString)
.#<(new ByteArrayInputStream(toSend.getBytes))
.run(ProcessLogger(o => actualOutput.append(o)))
process.exitValue()
actualOutput.mkString mustBe toReceive
WindowsPluginFrontend.cleanup(state)
it must "not hang if there is an error in generator" in {
testFailure(WindowsPluginFrontend)
}
}
}

0 comments on commit 6f0dfdb

Please sign in to comment.