Skip to content

Commit

Permalink
Fix testcontainers after clean or reload
Browse files Browse the repository at this point in the history
  • Loading branch information
adpi2 committed Feb 27, 2024
1 parent 2a2cdb3 commit aa8fc4e
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 110 deletions.
8 changes: 2 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ lazy val infra = project
"io.circe" %% "circe-parser"
).map(_ % V.circe),
Elasticsearch.settings(defaultPort = 9200),
inConfig(Compile)(
Postgres.settings(defaultPort = 5432, database = "scaladex")
),
Postgres.settings(Compile, defaultPort = 5432, database = "scaladex"),
javaOptions ++= {
val base = (ThisBuild / baseDirectory).value
val index = base / "small-index"
Expand All @@ -105,9 +103,7 @@ lazy val infra = project
s"-Dscaladex.elasticsearch.port=$elasticsearchPort"
)
},
inConfig(Test)(
Postgres.settings(defaultPort = 5432, database = "scaladex-test")
),
Postgres.settings(Test, defaultPort = 5432, database = "scaladex-test"),
Test / javaOptions ++= {
val elasticsearchPort = startElasticsearch.value
val postgresPort = (Test / startPostgres).value
Expand Down
15 changes: 15 additions & 0 deletions project/Docker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import org.testcontainers.DockerClientFactory
import org.testcontainers.dockerclient.DockerClientProviderStrategy

object Docker {
lazy val client = {
CurrentThread.setContextClassLoader[DockerClientProviderStrategy]
DockerClientFactory.instance().client()
}

def kill(containerId: String): Unit =
try client.killContainerCmd(containerId).exec()
catch {
case _: Throwable => ()
}
}
90 changes: 48 additions & 42 deletions project/Elasticsearch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,60 +9,55 @@ import java.net.URL
import java.io.IOException
import org.testcontainers.elasticsearch.ElasticsearchContainer
import org.testcontainers.containers.BindMode
import scala.collection.mutable
import scala.collection.concurrent.TrieMap
import java.nio.file.Path

object Elasticsearch extends AutoPlugin {
private val containers: mutable.Map[Path, ElasticsearchContainer] = TrieMap.empty

object autoImport {
val elasticsearchDefaultPort =
settingKey[Int]("Port of elasticserach instance")
val elasticsearchFolder =
settingKey[File]("Folder where elasticsearch data are stored")
val startElasticsearch = taskKey[Int](
"Chek that elasticsearch has already started or else start a container"
)
val startElasticsearch = taskKey[Int]("Connect to Elasticsearch or start an Elasticsearch container")
}

import autoImport._

def settings(defaultPort: Int): Seq[Setting[_]] = Seq(
elasticsearchDefaultPort := defaultPort,
elasticsearchFolder := Keys.baseDirectory.value / ".esdata",
startElasticsearch := {
import sbt.util.CacheImplicits._
val dataFolder = elasticsearchFolder.value
val defaultPort = elasticsearchDefaultPort.value
val dataFolder = Keys.baseDirectory.value / ".esdata"
val streams = Keys.streams.value
val store = streams.cacheStoreFactory.make("last")
val logger = streams.log
val tracker = util.Tracked.lastOutput[Unit, Int](store) {
case (_, None) =>
checkOrStart(dataFolder, defaultPort, logger)
case (_, Some(previousPort)) =>
checkOrStart(dataFolder, previousPort, logger)
if (canConnect(defaultPort)) {
logger.info(s"Elasticsearch available on port $defaultPort")
defaultPort
} else {
// we cache the container to reuse it after a reload
val store = streams.cacheStoreFactory.make("container")
val tracker = util.Tracked.lastOutput[Unit, (String, Int)](store) {
case (_, None) =>
startContainer(dataFolder, logger)
case (_, Some((containerId, port))) =>
if (canConnect(port)) {
logger.info(s"Elasticsearch container already started on port $port")
(containerId, port)
} else {
Docker.kill(containerId)
startContainer(dataFolder, logger)
}
}
tracker(())._2
}
tracker(())
},
Keys.clean := {
Keys.clean.value
val dataFolder = Keys.baseDirectory.value / ".esdata"
containers.get(dataFolder.toPath).foreach(_.close())
containers.remove(dataFolder.toPath)
}
)

private def checkOrStart(
dataFolder: File,
previousPort: Int,
logger: Logger
): Int = {
logger.info(s"Trying to connect to elasticsearch on port $previousPort")
if (alreadyStarted(previousPort)) {
logger.info(s"Elasticsearch has already started on port $previousPort")
previousPort
} else {
logger.info("Trying to start elasticsearch container")
val port = start(dataFolder)
logger.info(
s"Elasticsearch container successfully started with port $port"
)
port
}
}

private def start(dataFolder: File): Int = {
private def startContainer(dataFolder: File, logger: Logger): (String, Int) = {
if (!dataFolder.exists) IO.createDirectory(dataFolder)
IO.setPermissions(dataFolder, "rwxrwxrwx")

Expand All @@ -74,16 +69,26 @@ object Elasticsearch extends AutoPlugin {
container
.withEnv("discovery.type", "single-node")
.withEnv("ES_JAVA_OPTS", "-Xms512m -Xmx512m")
.addFileSystemBind(
.withFileSystemBind(
dataFolder.toString,
"/usr/share/elasticsearch/data",
BindMode.READ_WRITE
)
container.start()
container.getFirstMappedPort()
val port =
try {
container.start()
container.getFirstMappedPort()
} catch {
case e: Throwable =>
container.stop()
throw e
}
logger.info(s"Ealsticsearch container started on port $port")
containers(dataFolder.toPath) = container
(container.getContainerId, port)
}

private def alreadyStarted(port: Int): Boolean = {
private def canConnect(port: Int): Boolean = {
val url = new URL(s"http://localhost:$port/")
try {
val connection = url.openConnection().asInstanceOf[HttpURLConnection]
Expand All @@ -92,6 +97,7 @@ object Elasticsearch extends AutoPlugin {
val respCode = connection.getResponseCode
if (respCode != HttpURLConnection.HTTP_OK)
throw new MessageOnlyException(s"Got response code $respCode on $url")
connection.disconnect()
true
} catch {
case _: TimeoutException | _: IOException => false
Expand Down
134 changes: 72 additions & 62 deletions project/Postgres.scala
Original file line number Diff line number Diff line change
@@ -1,70 +1,66 @@
import sbt._
import java.nio.file.Path
import java.sql.DriverManager

import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
import scala.collection.mutable
import scala.util.Try

import org.testcontainers.dockerclient.DockerClientProviderStrategy
import org.testcontainers.utility.DockerImageName
import org.testcontainers.containers.BindMode
import org.testcontainers.containers.PostgreSQLContainer
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy

import java.sql.DriverManager
import scala.util.Try
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy
import org.testcontainers.containers.wait.strategy.WaitAllStrategy
import org.testcontainers.dockerclient.DockerClientProviderStrategy
import org.testcontainers.utility.DockerImageName
import sbt._

object Postgres extends AutoPlugin {
private val containers: mutable.Map[Path, PostgreSQLContainer[Nothing]] = TrieMap.empty

object autoImport {
val postgresDefaultPort = settingKey[Int]("Default port of postgres")
val postgresFolder =
settingKey[File]("Folder where postgres data are stored")
val postgresDatabase = settingKey[String]("Name of the postgres database")
val startPostgres = taskKey[Int](
"Chek that postgres has already started or else start a container"
)
val startPostgres = taskKey[Int]("Connect to Postgres or start a Postgres container")
}

import autoImport._

def settings(defaultPort: Int, database: String): Seq[Setting[_]] = Seq(
postgresDefaultPort := defaultPort,
postgresFolder := {
val c = Keys.configuration.?.value
val suffix = c.map(c => s"-${c.name}").getOrElse("")
Keys.baseDirectory.value / s".postgresql$suffix"
},
postgresDatabase := database,
startPostgres := {
def settings(config: Configuration, defaultPort: Int, database: String): Seq[Setting[_]] = Seq(
config / startPostgres := {
import sbt.util.CacheImplicits._
val dataFolder = postgresFolder.value
val defaultPort = postgresDefaultPort.value
val database = postgresDatabase.value
val dataFolder = Keys.baseDirectory.value / s".postgresql-${config.name}"
val streams = Keys.streams.value
val store = streams.cacheStoreFactory.make("last")
val logger = streams.log
val tracker = util.Tracked.lastOutput[Unit, Int](store) {
case (_, None) =>
checkOrStart(dataFolder, defaultPort, database, logger)
case (_, Some(previousPort)) =>
checkOrStart(dataFolder, previousPort, database, logger)

if (canConnect(defaultPort, database)) {
logger.info(s"Postgres is available on port $defaultPort")
defaultPort
} else {
// we cache the container to reuse it after a reload
val store = streams.cacheStoreFactory.make("container")
val tracker = util.Tracked.lastOutput[Unit, (String, Int)](store) {
case (_, None) =>
startContainer(dataFolder, database, logger)
case (_, Some((containerId, port))) =>
if (canConnect(port, database)) {
logger.info(s"Postgres container already started on port $port")
(containerId, port)
} else {
Docker.kill(containerId)
startContainer(dataFolder, database, logger)
}
}
tracker(())._2
}
tracker(())
},
Keys.clean := {
Keys.clean.value
val dataFolder = Keys.baseDirectory.value / s".postgresql-${config.name}"
containers.get(dataFolder.toPath).foreach(_.close())
containers.remove(dataFolder.toPath)
}
)

private def checkOrStart(
dataFolder: File,
previousPort: Int,
database: String,
logger: Logger
): Int =
if (alreadyStarted(previousPort, database, logger)) {
logger.info(s"Postgres has already started on port $previousPort")
previousPort
} else {
logger.info("Trying to start postgres container")
val port = start(dataFolder, database)
logger.info(s"Postgres container successfully started with port $port")
port
}

private def start(dataFolder: File, database: String): Int = {
private def startContainer(dataFolder: File, database: String, logger: Logger): (String, Int) = {
if (!dataFolder.exists) IO.createDirectory(dataFolder)
IO.setPermissions(dataFolder, "rwxrwxrwx")

Expand All @@ -74,35 +70,49 @@ object Postgres extends AutoPlugin {
val container = new PostgreSQLContainer(dockerImage)

// change the wait strategy because of https://github.com/testcontainers/testcontainers-java/issues/455
val waitStrategy = new HostPortWaitStrategy()
container.setWaitStrategy(waitStrategy)
// and https://github.com/testcontainers/testcontainers-java/issues/3372
val hostPort = new HostPortWaitStrategy()
val logMessage = new LogMessageWaitStrategy().withRegEx(".*database system is ready to accept connections.*")
val portAndMessage = new WaitAllStrategy().withStrategy(hostPort).withStrategy(logMessage)
container.waitingFor(portAndMessage)

container.withDatabaseName(database)
container.withUsername("user")
container.withPassword("password")
container.withEnv("PGDATA", "/usr/share/postgres/data")
container.addFileSystemBind(
container.withFileSystemBind(
dataFolder.toString,
"/usr/share/postgres",
BindMode.READ_WRITE
)
container.start()
container.getFirstMappedPort()

val port =
try {
container.start()
container.getFirstMappedPort()
} catch {
case e: Throwable =>
container.close()
throw e
}
logger.info(s"Postgres container started on port $port")
containers(dataFolder.toPath) = container
(container.getContainerId(), port)
}

private def alreadyStarted(
port: Int,
database: String,
logger: Logger
): Boolean = {
private def canConnect(port: Int, database: String): Boolean = {
// `CurrentThread.setContextClassLoader[org.postgresql.Driver]` should work but it does not
CurrentThread.setContextClassLoader("org.postgresql.Driver")
Try(
DriverManager.getConnection(
try {
val connection = DriverManager.getConnection(
s"jdbc:postgresql://localhost:$port/$database",
"user",
"password"
)
).fold(fa => { println(fa); false }, _ => true)
connection.close()
true
} catch {
case _: Throwable => false
}
}
}

0 comments on commit aa8fc4e

Please sign in to comment.