Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TEST][NO-MERGE] Stress test sockets #381

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package protocbridge.frontend

import java.nio.file.attribute.PosixFilePermission
import java.nio.file.{Files, Path}
import java.{util => ju}

/** PluginFrontend for macOS.
*
* Creates a server socket and uses `nc` to communicate with the socket. We use
* a server socket instead of named pipes because named pipes are unreliable on
* macOS: https://github.com/scalapb/protoc-bridge/issues/366. Since `nc` is
* widely available on macOS, this is the simplest and most reliable solution
* for macOS.
*/
object MacPluginFrontend extends SocketBasedPluginFrontend {

protected def createShellScript(port: Int): Path = {
val shell = sys.env.getOrElse("PROTOCBRIDGE_SHELL", "/bin/sh")
// We use 127.0.0.1 instead of localhost for the (very unlikely) case that localhost is missing from /etc/hosts.
val scriptName = PluginFrontend.createTempFile(
"",
s"""|#!$shell
|set -e
|nc 127.0.0.1 $port
""".stripMargin
)
val perms = new ju.HashSet[PosixFilePermission]
perms.add(PosixFilePermission.OWNER_EXECUTE)
perms.add(PosixFilePermission.OWNER_READ)
Files.setPosixFilePermissions(
scriptName,
perms
)
scriptName
}
}
25 changes: 15 additions & 10 deletions bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import java.nio.file.{Files, Path}

import protocbridge.{ProtocCodeGenerator, ExtraEnv}

import scala.util.Try

/** A PluginFrontend instance provides a platform-dependent way for protoc to
* communicate with a JVM based ProtocCodeGenerator.
*
Expand Down Expand Up @@ -47,13 +45,7 @@ object PluginFrontend {
gen: ProtocCodeGenerator,
request: Array[Byte]
): Array[Byte] = {
Try {
gen.run(request)
}.recover { case throwable =>
createCodeGeneratorResponseWithError(
throwable.toString + "\n" + getStackTrace(throwable)
)
}.get
gen.run(request)
}

def createCodeGeneratorResponseWithError(error: String): Array[Byte] = {
Expand Down Expand Up @@ -116,9 +108,17 @@ object PluginFrontend {
gen: ProtocCodeGenerator,
fsin: InputStream,
env: ExtraEnv
): Array[Byte] = {
): Array[Byte] = try {
val bytes = readInputStreamToByteArrayWithEnv(fsin, env)
runWithBytes(gen, bytes)
} catch {
// This covers all Throwable including OutOfMemoryError, StackOverflowError, etc.
// We need to make a best effort to return a response to protoc,
// otherwise protoc can hang indefinitely.
case throwable: Throwable =>
createCodeGeneratorResponseWithError(
throwable.toString + "\n" + getStackTrace(throwable)
)
}

def createTempFile(extension: String, content: String): Path = {
Expand All @@ -131,8 +131,13 @@ object PluginFrontend {

def isWindows: Boolean = sys.props("os.name").startsWith("Windows")

def isMac: Boolean = sys.props("os.name").startsWith("Mac") || sys
.props("os.name")
.startsWith("Darwin")

def newInstance: PluginFrontend = {
if (isWindows) WindowsPluginFrontend
else if (isMac) MacPluginFrontend
else PosixPluginFrontend
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.sys.process._
import java.{util => ju}

/** PluginFrontend for Unix-like systems (Linux, Mac, etc)
/** PluginFrontend for Unix-like systems <b>except macOS</b> (Linux, FreeBSD,
* etc)
*
* Creates a pair of named pipes for input/output and a shell script that
* communicates with them.
* communicates with them. Compared with `SocketBasedPluginFrontend`, this
* frontend doesn't rely on `nc` that might not be available in some
* distributions.
*/
object PosixPluginFrontend extends PluginFrontend {
case class InternalState(
Expand All @@ -40,6 +43,11 @@ object PosixPluginFrontend extends PluginFrontend {
val response = PluginFrontend.runWithInputStream(plugin, fsin, env)
fsin.close()

// Note that the output pipe must be opened after the input pipe is consumed.
// Otherwise, there might be a deadlock that
// - The shell script is stuck writing to the input pipe (which has a full buffer),
// and doesn't open the write end of the output pipe.
// - This thread is stuck waiting for the write end of the output pipe to be opened.
val fsout = Files.newOutputStream(outputPipe)
fsout.write(response)
fsout.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package protocbridge.frontend

import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.net.ServerSocket
import java.nio.file.{Files, Path}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, blocking}

/** PluginFrontend for Windows and macOS where a server socket is used.
*/
abstract class SocketBasedPluginFrontend extends PluginFrontend {
case class InternalState(serverSocket: ServerSocket, shellScript: Path)

override def prepare(
plugin: ProtocCodeGenerator,
env: ExtraEnv
): (Path, InternalState) = {
val ss = new ServerSocket(0) // Bind to any available port.
val sh = createShellScript(ss.getLocalPort)

Future {
blocking {
// Accept a single client connection from the shell script.
val client = ss.accept()
try {
val response =
PluginFrontend.runWithInputStream(
plugin,
client.getInputStream,
env
)
client.getOutputStream.write(response)
} finally {
client.close()
}
}
}

(sh, InternalState(ss, sh))
}

override def cleanup(state: InternalState): Unit = {
state.serverSocket.close()
if (sys.props.get("protocbridge.debug") != Some("1")) {
Files.delete(state.shellScript)
}
}

protected def createShellScript(port: Int): Path
}
Original file line number Diff line number Diff line change
@@ -1,53 +1,15 @@
package protocbridge.frontend

import java.net.ServerSocket
import java.nio.file.{Files, Path, Paths}

import protocbridge.ExtraEnv
import protocbridge.ProtocCodeGenerator

import scala.concurrent.blocking

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import java.nio.file.{Path, Paths}

/** A PluginFrontend that binds a server socket to a local interface. The plugin
* is a batch script that invokes BridgeApp.main() method, in a new JVM with
* the same parameters as the currently running JVM. The plugin will
* communicate its stdin and stdout to this socket.
*/
object WindowsPluginFrontend extends PluginFrontend {

case class InternalState(batFile: Path)

override def prepare(
plugin: ProtocCodeGenerator,
env: ExtraEnv
): (Path, InternalState) = {
val ss = new ServerSocket(0)
val state = createWindowsScript(ss.getLocalPort)

Future {
blocking {
val client = ss.accept()
val response =
PluginFrontend.runWithInputStream(plugin, client.getInputStream, env)
client.getOutputStream.write(response)
client.close()
ss.close()
}
}

(state.batFile, state)
}

override def cleanup(state: InternalState): Unit = {
if (sys.props.get("protocbridge.debug") != Some("1")) {
Files.delete(state.batFile)
}
}
object WindowsPluginFrontend extends SocketBasedPluginFrontend {

private def createWindowsScript(port: Int): InternalState = {
protected def createShellScript(port: Int): Path = {
val classPath =
Paths.get(getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
val classPathBatchString = classPath.toString.replace("%", "%%")
Expand All @@ -62,6 +24,6 @@ object WindowsPluginFrontend extends PluginFrontend {
].getName} $port
""".stripMargin
)
InternalState(batchFile)
batchFile
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package protocbridge.frontend

class MacPluginFrontendSpec extends OsSpecificFrontendSpec {
if (PluginFrontend.isMac) {
it must "execute a program that forwards input and output to given stream" in {
val state = testSuccess(MacPluginFrontend)
state.serverSocket.isClosed mustBe true
}

it must "not hang if there is an error in generator" in {
val state = testFailure(MacPluginFrontend)
state.serverSocket.isClosed mustBe true
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package protocbridge.frontend

import org.apache.commons.io.IOUtils
import org.scalatest.exceptions.TestFailedException
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.must.Matchers
import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.io.ByteArrayOutputStream
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future, TimeoutException}
import scala.sys.process.ProcessIO
import scala.util.Random

class OsSpecificFrontendSpec extends AnyFlatSpec with Matchers {

protected def testPluginFrontend(
frontend: PluginFrontend,
generator: ProtocCodeGenerator,
env: ExtraEnv,
request: Array[Byte]
): (frontend.InternalState, Array[Byte]) = {
val (path, state) = frontend.prepare(
generator,
env
)
val actualOutput = new ByteArrayOutputStream()
val process = sys.process
.Process(path.toAbsolutePath.toString)
.run(
new ProcessIO(
writeInput => {
writeInput.write(request)
writeInput.close()
},
processOutput => {
IOUtils.copy(processOutput, actualOutput)
processOutput.close()
},
processError => {
IOUtils.copy(processError, System.err)
processError.close()
}
)
)
try {
Await.result(Future { process.exitValue() }, 5.seconds)
} catch {
case _: TimeoutException =>
System.err.println(s"Timeout")
process.destroy()
}
frontend.cleanup(state)
(state, actualOutput.toByteArray)
}

protected def testSuccess(
frontend: PluginFrontend
): frontend.InternalState = {
val random = new Random()
val toSend = Array.fill(123)(random.nextInt(256).toByte)
val toReceive = Array.fill(456)(random.nextInt(256).toByte)
val env = new ExtraEnv(secondaryOutputDir = "tmp")

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
request mustBe (toSend ++ env.toByteArrayAsField)
toReceive
}
}
// Repeat 100,000 times since named pipes on macOS are flaky.
val repeatCount = 100000
for (i <- 1 until repeatCount) {
if (i % 100 == 1) println(s"Running iteration $i of $repeatCount")
val (state, response) =
testPluginFrontend(frontend, fakeGenerator, env, toSend)
try {
response mustBe toReceive
} catch {
case e: TestFailedException =>
System.err.println(s"""Failed on iteration $i of $repeatCount: ${e.getMessage}""")
}
}
val (state, response) =
testPluginFrontend(frontend, fakeGenerator, env, toSend)
try {
response mustBe toReceive
} catch {
case e: TestFailedException =>
System.err.println(s"""Failed on iteration $repeatCount of $repeatCount: ${e.getMessage}""")
}
state
}

protected def testFailure(
frontend: PluginFrontend
): frontend.InternalState = {
val random = new Random()
val toSend = Array.fill(123)(random.nextInt(256).toByte)
val env = new ExtraEnv(secondaryOutputDir = "tmp")

val fakeGenerator = new ProtocCodeGenerator {
override def run(request: Array[Byte]): Array[Byte] = {
throw new OutOfMemoryError("test error")
}
}
val (state, response) =
testPluginFrontend(frontend, fakeGenerator, env, toSend)
response.length must be > 0
state
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package protocbridge.frontend

class PosixPluginFrontendSpec extends OsSpecificFrontendSpec {
if (!PluginFrontend.isWindows && !PluginFrontend.isMac) {
it must "execute a program that forwards input and output to given stream" in {
testSuccess(MacPluginFrontend)
}

it must "not hang if there is an OOM in generator" in {
testFailure(MacPluginFrontend)
}
}
}
Loading