diff --git a/.github/workflows/benchmark-e2e.yml b/.github/workflows/benchmark-e2e.yml
new file mode 100644
index 00000000000..fec68c96a68
--- /dev/null
+++ b/.github/workflows/benchmark-e2e.yml
@@ -0,0 +1,24 @@
+name: Benchmarks E2E
+on:
+ schedule:
+ - cron: '0 23 * * *'
+ workflow_dispatch:
+jobs:
+ benchmarks:
+ name: Benchmarks E2E
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v2
+ - name: Setup Scala
+ uses: olafurpg/setup-scala@v10
+ with:
+ java-version: "openjdk@1.11"
+ - name: Run benchmarks
+ run: |
+ bash -c "export NUSSKNACKER_SCALA_VERSION=2.12 && sbt \"benchmarks/test:runMain pl.touk.nussknacker.engine.benchmarks.e2e.FlinkSteamingScenarioBenchmark 10000000\""
+ - name: Store benchmark results
+ uses: actions/upload-artifact@v2
+ with:
+ name: benchmark-e2e.csv
+ path: tmp/benchmarkResult.csv
diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark-jmh.yml
similarity index 81%
rename from .github/workflows/benchmark.yml
rename to .github/workflows/benchmark-jmh.yml
index 52361b23fdc..feb5ef1aefb 100644
--- a/.github/workflows/benchmark.yml
+++ b/.github/workflows/benchmark-jmh.yml
@@ -1,10 +1,11 @@
-name: Benchmarks
+name: Benchmarks JMH
on:
schedule:
- cron: '0 23 * * *'
+ workflow_dispatch:
jobs:
benchmarks:
- name: Benchmarks
+ name: Benchmarks JMH
runs-on: ubuntu-latest
steps:
- name: Checkout
@@ -19,4 +20,4 @@ jobs:
uses: actions/upload-artifact@v2
with:
name: jmh-result.csv
- path: benchmarks/jmh-result.csv
\ No newline at end of file
+ path: benchmarks/jmh-result.csv
diff --git a/benchmarks/src/test/resources/benchmark-setup.override.yml b/benchmarks/src/test/resources/benchmark-setup.override.yml
new file mode 100644
index 00000000000..8c199820de3
--- /dev/null
+++ b/benchmarks/src/test/resources/benchmark-setup.override.yml
@@ -0,0 +1,17 @@
+services:
+
+ bootstrap-setup:
+ environment:
+ BENCHMARK_FLINK_DEPLOY: false
+ volumes:
+ - ../../benchmarks/src/test/resources/e2e/benchmark-flink:/scenario-examples/benchmark-flink
+
+ designer:
+ environment:
+ KAFKA_AUTO_OFFSET_RESET: "earliest"
+
+ kafka:
+ deploy:
+ resources:
+ limits:
+ memory: 2048M
diff --git a/benchmarks/src/test/resources/e2e/benchmark-flink/DetectLargeTransactions.json b/benchmarks/src/test/resources/e2e/benchmark-flink/DetectLargeTransactions.json
new file mode 100644
index 00000000000..ecf8cc3c378
--- /dev/null
+++ b/benchmarks/src/test/resources/e2e/benchmark-flink/DetectLargeTransactions.json
@@ -0,0 +1,202 @@
+{
+ "metaData" : {
+ "id" : "DetectLargeTransactions",
+ "additionalFields" : {
+ "description" : null,
+ "properties" : {
+ "parallelism" : "2"
+ },
+ "metaDataType": "StreamMetaData"
+ }
+ },
+ "nodes" : [
+ {
+ "id" : "transactions",
+ "ref" : {
+ "typ" : "kafka",
+ "parameters" : [
+ {
+ "name" : "Topic",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "'transactions'"
+ }
+ },
+ {
+ "name" : "Schema version",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "'latest'"
+ }
+ }
+ ]
+ },
+ "additionalFields" : {
+ "description" : null,
+ "layoutData" : {
+ "x" : 180,
+ "y" : 0
+ }
+ },
+ "type" : "Source"
+ },
+ {
+ "nextFalse" : [
+ {
+ "nextFalse" : [
+ ],
+ "id" : "only large ones",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "#input.amount > 30"
+ },
+ "isDisabled" : null,
+ "additionalFields" : {
+ "description" : null,
+ "layoutData" : {
+ "x" : 0,
+ "y" : 360
+ }
+ },
+ "type" : "Filter"
+ },
+ {
+ "id" : "send for audit",
+ "ref" : {
+ "typ" : "kafka",
+ "parameters" : [
+ {
+ "name" : "Topic",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "'processedEvents'"
+ }
+ },
+ {
+ "name" : "Schema version",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "'latest'"
+ }
+ },
+ {
+ "name" : "Key",
+ "expression" : {
+ "language" : "spel",
+ "expression" : ""
+ }
+ },
+ {
+ "name" : "Raw editor",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "true"
+ }
+ },
+ {
+ "name" : "Value validation mode",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "'strict'"
+ }
+ },
+ {
+ "name" : "Value",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "#input"
+ }
+ }
+ ]
+ },
+ "endResult" : null,
+ "isDisabled" : null,
+ "additionalFields" : {
+ "description" : null,
+ "layoutData" : {
+ "x" : 0,
+ "y" : 540
+ }
+ },
+ "type" : "Sink"
+ }
+ ],
+ "id" : "last packet",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "#input.isLast"
+ },
+ "isDisabled" : null,
+ "additionalFields" : {
+ "description" : null,
+ "layoutData" : {
+ "x" : 180,
+ "y" : 180
+ }
+ },
+ "type" : "Filter"
+ },
+ {
+ "id" : "send to alerts",
+ "ref" : {
+ "typ" : "kafka",
+ "parameters" : [
+ {
+ "name" : "Topic",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "'alerts'"
+ }
+ },
+ {
+ "name" : "Schema version",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "'latest'"
+ }
+ },
+ {
+ "name" : "Key",
+ "expression" : {
+ "language" : "spel",
+ "expression" : ""
+ }
+ },
+ {
+ "name" : "Raw editor",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "false"
+ }
+ },
+ {
+ "name" : "message",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "\"Last request\""
+ }
+ },
+ {
+ "name" : "eventDate",
+ "expression" : {
+ "language" : "spel",
+ "expression" : "#DATE.now.toEpochMilli"
+ }
+ }
+ ]
+ },
+ "endResult" : null,
+ "isDisabled" : null,
+ "additionalFields" : {
+ "description" : null,
+ "layoutData" : {
+ "x" : 360,
+ "y" : 360
+ }
+ },
+ "type" : "Sink"
+ }
+ ],
+ "additionalBranches" : [
+ ]
+}
diff --git a/benchmarks/src/test/resources/e2e/benchmark-flink/setup/kafka/topics.txt b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/kafka/topics.txt
new file mode 100644
index 00000000000..dd723cab3d6
--- /dev/null
+++ b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/kafka/topics.txt
@@ -0,0 +1,3 @@
+transactions
+processedEvents
+alerts
diff --git a/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/alerts.schema.json b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/alerts.schema.json
new file mode 100644
index 00000000000..30fb0f3043c
--- /dev/null
+++ b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/alerts.schema.json
@@ -0,0 +1,10 @@
+{
+ "$schema": "http://json-schema.org/draft-07/schema",
+ "type": "object",
+ "properties": {
+ "message": { "type": "string" },
+ "eventDate": { "type": "integer" }
+ },
+ "required": ["message"],
+ "additionalProperties": false
+}
diff --git a/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/processedEvents.schema.json b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/processedEvents.schema.json
new file mode 100644
index 00000000000..cfcf28f0a03
--- /dev/null
+++ b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/processedEvents.schema.json
@@ -0,0 +1,11 @@
+{
+ "$schema": "http://json-schema.org/draft-07/schema",
+ "type": "object",
+ "properties": {
+ "clientId": { "type": "string" },
+ "amount": { "type": "integer" },
+ "isLast": { "type": "boolean", "default": false }
+ },
+ "required": ["clientId", "amount"],
+ "additionalProperties": false
+}
diff --git a/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/transactions.schema.json b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/transactions.schema.json
new file mode 100644
index 00000000000..cfcf28f0a03
--- /dev/null
+++ b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/transactions.schema.json
@@ -0,0 +1,11 @@
+{
+ "$schema": "http://json-schema.org/draft-07/schema",
+ "type": "object",
+ "properties": {
+ "clientId": { "type": "string" },
+ "amount": { "type": "integer" },
+ "isLast": { "type": "boolean", "default": false }
+ },
+ "required": ["clientId", "amount"],
+ "additionalProperties": false
+}
diff --git a/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/BaseE2EBenchmark.scala b/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/BaseE2EBenchmark.scala
new file mode 100644
index 00000000000..1b5f9c1003f
--- /dev/null
+++ b/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/BaseE2EBenchmark.scala
@@ -0,0 +1,31 @@
+package pl.touk.nussknacker.engine.benchmarks.e2e
+
+import com.typesafe.scalalogging.LazyLogging
+import pl.touk.nussknacker.engine.version.BuildInfo
+import pl.touk.nussknacker.test.installationexample.{
+ DockerBasedInstallationExampleClient,
+ DockerBasedInstallationExampleNuEnvironment
+}
+import pl.touk.nussknacker.test.MiscUtils._
+
+// Before running benchmarks in this module, a fresh docker image should be built from sources and placed in the local
+// registry. If you run tests based on this trait in Intellij Idea and the images is not built, you can do it manually:
+// `bash -c "export NUSSKNACKER_SCALA_VERSION=2.12 && sbt dist/Docker/publishLocal"`
+trait BaseE2EBenchmark {
+
+ val client: DockerBasedInstallationExampleClient =
+ BaseE2EBenchmark.dockerBasedInstallationExampleNuEnvironmentSingleton.client
+
+}
+
+object BaseE2EBenchmark extends LazyLogging {
+
+ val dockerBasedInstallationExampleNuEnvironmentSingleton =
+ new DockerBasedInstallationExampleNuEnvironment(
+ nussknackerImageVersion = BuildInfo.version,
+ dockerComposeTweakFiles = List(
+ BaseE2EBenchmark.getClass.getResourceAsStream("/benchmark-setup.override.yml").toFile
+ )
+ )
+
+}
diff --git a/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/FlinkSteamingScenarioBenchmark.scala b/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/FlinkSteamingScenarioBenchmark.scala
new file mode 100644
index 00000000000..b8836e0c2e4
--- /dev/null
+++ b/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/FlinkSteamingScenarioBenchmark.scala
@@ -0,0 +1,115 @@
+package pl.touk.nussknacker.engine.benchmarks.e2e
+
+import better.files.File.root
+import cats.effect.{ExitCode, IO, IOApp}
+import com.typesafe.scalalogging.LazyLogging
+
+import java.time.{Duration, Instant}
+import scala.concurrent.duration._
+import scala.util.{Random, Try}
+
+object FlinkSteamingScenarioBenchmark extends IOApp with BaseE2EBenchmark with LazyLogging {
+
+ override def run(args: List[String]): IO[ExitCode] = {
+ for {
+ messagesCount <- readBenchmarkMessagesCount(args)
+ _ <- log("Starting message generation...")
+ prepareDataDuration <- measure(generateBenchmarkMessages(messagesCount))
+ _ <- log("Generation finished!")
+ _ <- log("Starting benchmark scenario...")
+ runScenarioDuration <- measure(runScenarioAndStartProcessing())
+ _ <- log("Scenario started. Messages are processing...")
+ verifyResultDuration <- measure(waitForProcessingFinish())
+ _ <- log(s"All messages processed.")
+ _ <- saveMeasurementsToFile(prepareDataDuration, runScenarioDuration, verifyResultDuration)
+ _ <- log(s"Results:")
+ _ <- log(s"- Preparing data: ${prepareDataDuration.duration.toSeconds} seconds")
+ _ <- log(s"- Running scenario: ${runScenarioDuration.duration.toSeconds} seconds")
+ _ <- log(s"- Verifying result: ${verifyResultDuration.duration.toSeconds} seconds")
+ } yield ExitCode.Success
+ }
+
+ private def readBenchmarkMessagesCount(args: List[String]) = IO.delay {
+ args.headOption match {
+ case Some(arg) =>
+ Try(arg.toInt)
+ .getOrElse(throw new IllegalArgumentException("Invalid number format. Please provide a valid integer."))
+ case None =>
+ throw new IllegalArgumentException("No arguments provided. Please provide an integer argument.")
+ }
+ }
+
+ private def generateBenchmarkMessages(messagesCount: Int) = IO.delay {
+ val batch = 2000
+ val noOfBatches = Math.ceil(messagesCount / batch).toInt
+ (1 to noOfBatches)
+ .foreach { batchId =>
+ if (batchId % 10 == 0) logger.info(s"Generated ${"%.2f".format(batchId * 100.0 / noOfBatches)}% of messages...")
+ sendTestMessages(batchId, batch)
+ }
+ client.sendMessageToKafka("transactions", generateMessage(s"${messagesCount + 1}", last = true))
+ }
+
+ private def runScenarioAndStartProcessing() = IO.delay {
+ client.deployAndWaitForRunningState("DetectLargeTransactions")
+ }
+
+ private def waitForProcessingFinish(): IO[Unit] = {
+ def foundLastRequest(): IO[Boolean] = IO.delay {
+ client
+ .readAllMessagesFromKafka("alerts")
+ .exists(json => json.obj.get("message").exists(_.str == "Last request"))
+ }
+
+ foundLastRequest()
+ .flatMap {
+ case false =>
+ IO
+ .sleep(100 millis)
+ .flatMap(_ => waitForProcessingFinish())
+ case true =>
+ IO.pure(())
+ }
+ }
+
+ private def saveMeasurementsToFile(
+ prepareData: Measured[_],
+ runScenario: Measured[_],
+ verifyResult: Measured[_]
+ ): IO[Unit] = IO.delay {
+ def format(measured: Measured[_]) = "%.2f".format(measured.duration.toMillis.toDouble)
+
+ (root / "tmp" / "benchmarkResult.csv")
+ .createFileIfNotExists()
+ .clear()
+ .appendLine(s"prepareData,${format(prepareData)}")
+ .appendLine(s"runScenario,${format(runScenario)}")
+ .appendLine(s"verifyResult,${format(verifyResult)}")
+ .appendLine()
+ }
+
+ private def measure[T](action: IO[T]): IO[Measured[T]] = {
+ for {
+ started <- IO.delay(Instant.now())
+ result <- action
+ duration <- IO.delay(Duration.between(started, Instant.now()))
+ } yield Measured(result, duration)
+ }
+
+ private def sendTestMessages(batchId: Int, batchSize: Int): Unit = {
+ client.sendMessagesToKafka(
+ "transactions",
+ (0 until batchSize).map { id => generateMessage(s"${batchId + id}", last = false) }
+ )
+ }
+
+ private def generateMessage(id: String, last: Boolean) = ujson.Obj(
+ "amount" -> (Random.nextInt(1000) + 1),
+ "clientId" -> s"$id",
+ "isLast" -> last
+ )
+
+ private def log(message: => String) = IO.delay(logger.info(message))
+}
+
+final case class Measured[T](result: T, duration: Duration)
diff --git a/build.sbt b/build.sbt
index ca61525287b..b5a89adfb63 100644
--- a/build.sbt
+++ b/build.sbt
@@ -834,8 +834,9 @@ lazy val benchmarks = (project in file("benchmarks"))
name := "nussknacker-benchmarks",
libraryDependencies ++= {
Seq(
- "org.apache.flink" % "flink-streaming-java" % flinkV exclude ("com.esotericsoftware", "kryo-shaded"),
- "org.apache.flink" % "flink-runtime" % flinkV
+ "org.apache.flink" % "flink-streaming-java" % flinkV exclude ("com.esotericsoftware", "kryo-shaded"),
+ "org.apache.flink" % "flink-runtime" % flinkV,
+ "com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % Test,
)
},
Jmh / run / javaOptions ++= (
@@ -851,6 +852,13 @@ lazy val benchmarks = (project in file("benchmarks"))
Jmh / dependencyClasspath := (Test / dependencyClasspath).value,
Jmh / generateJmhSourcesAndResources := (Jmh / generateJmhSourcesAndResources).dependsOn(Test / compile).value,
)
+ .settings {
+ // TODO: it'd be better to use scalaVersion here, but for some reason it's hard to disable existing task dynamically
+ forScalaVersion(defaultScalaV) {
+ case (2, 12) => doExecuteMainFromTestSources
+ case (2, 13) => executeMainFromTestSourcesNotSupported
+ }
+ }
.dependsOn(
designer,
extensionsApi,
@@ -862,6 +870,20 @@ lazy val benchmarks = (project in file("benchmarks"))
testUtils % Test
)
+lazy val doExecuteMainFromTestSources = Seq(
+ (Test / runMain) := (Test / runMain)
+ .dependsOn(distribution / Docker / publishLocal)
+ .evaluated
+)
+
+lazy val executeMainFromTestSourcesNotSupported = Seq(
+ (Test / runMain) := {
+ streams.value.log.info(
+ "E2E benchmarks are skipped for Scala 2.13 because Nu installation example is currently based on Scala 2.12"
+ )
+ }
+)
+
lazy val kafkaUtils = (project in utils("kafka-utils"))
.settings(commonSettings)
.settings(
@@ -1109,24 +1131,26 @@ lazy val testUtils = (project in utils("test-utils"))
name := "nussknacker-test-utils",
libraryDependencies ++= {
Seq(
- "org.scalatest" %% "scalatest" % scalaTestV,
- "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV,
- "com.typesafe" % "config" % configV,
- "org.typelevel" %% "cats-core" % catsV,
- "ch.qos.logback" % "logback-classic" % logbackV,
- "org.springframework" % "spring-jcl" % springV,
- "commons-io" % "commons-io" % flinkCommonsIOV,
- "org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV,
- "com.softwaremill.sttp.client3" %% "slf4j-backend" % sttpV,
- "org.typelevel" %% "cats-effect" % catsEffectV,
- "io.circe" %% "circe-parser" % circeV,
- "org.testcontainers" % "testcontainers" % testContainersJavaV,
- "com.lihaoyi" %% "ujson" % ujsonV,
+ "com.github.pathikrit" %% "better-files" % betterFilesV,
+ "org.scalatest" %% "scalatest" % scalaTestV,
+ "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV,
+ "com.typesafe" % "config" % configV,
+ "org.typelevel" %% "cats-core" % catsV,
+ "ch.qos.logback" % "logback-classic" % logbackV,
+ "org.springframework" % "spring-jcl" % springV,
+ "commons-io" % "commons-io" % flinkCommonsIOV,
+ "org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV,
+ "com.softwaremill.sttp.client3" %% "slf4j-backend" % sttpV,
+ "org.typelevel" %% "cats-effect" % catsEffectV,
+ "io.circe" %% "circe-parser" % circeV,
+ "org.testcontainers" % "testcontainers" % testContainersJavaV,
+ "com.dimafeng" %% "testcontainers-scala-core" % testContainersScalaV,
+ "com.lihaoyi" %% "ujson" % ujsonV,
// This lib produces more descriptive errors during validation than everit
- "com.networknt" % "json-schema-validator" % "1.4.0",
- "com.softwaremill.sttp.tapir" %% "tapir-core" % tapirV,
- "com.softwaremill.sttp.tapir" %% "tapir-apispec-docs" % tapirV,
- "com.softwaremill.sttp.apispec" %% "openapi-circe-yaml" % openapiCirceYamlV,
+ "com.networknt" % "json-schema-validator" % "1.4.0",
+ "com.softwaremill.sttp.tapir" %% "tapir-core" % tapirV,
+ "com.softwaremill.sttp.tapir" %% "tapir-apispec-docs" % tapirV,
+ "com.softwaremill.sttp.apispec" %% "openapi-circe-yaml" % openapiCirceYamlV,
) ++ restAssuredDependency(scalaVersion.value)
}
)
diff --git a/e2e-tests/src/test/resources/spec-setup/batch-config/batch-customizations.conf b/e2e-tests/src/test/resources/batch-data-generation/batch-customizations.conf
similarity index 100%
rename from e2e-tests/src/test/resources/spec-setup/batch-config/batch-customizations.conf
rename to e2e-tests/src/test/resources/batch-data-generation/batch-customizations.conf
diff --git a/e2e-tests/src/test/resources/spec-setup/batch-config/tables-definition.sql b/e2e-tests/src/test/resources/batch-data-generation/tables-definition.sql
similarity index 100%
rename from e2e-tests/src/test/resources/spec-setup/batch-config/tables-definition.sql
rename to e2e-tests/src/test/resources/batch-data-generation/tables-definition.sql
diff --git a/e2e-tests/src/test/resources/spec-setup/batch-config/transactions/transactions.ndjson b/e2e-tests/src/test/resources/batch-data-generation/transactions/transactions.ndjson
similarity index 100%
rename from e2e-tests/src/test/resources/spec-setup/batch-config/transactions/transactions.ndjson
rename to e2e-tests/src/test/resources/batch-data-generation/transactions/transactions.ndjson
diff --git a/e2e-tests/src/test/resources/batch-nu-designer.override.yml b/e2e-tests/src/test/resources/batch-nu-designer.override.yml
new file mode 100644
index 00000000000..4819e8a33f9
--- /dev/null
+++ b/e2e-tests/src/test/resources/batch-nu-designer.override.yml
@@ -0,0 +1,10 @@
+services:
+
+ designer:
+ environment:
+ CONFIG_FILE: "/opt/nussknacker/conf/application.conf,/opt/nussknacker/conf/application-customizations.conf,/opt/nussknacker/conf/batch-customizations.conf"
+ TABLES_DEFINITION_FILE: "/opt/nussknacker/conf/tables-definition.sql"
+ volumes:
+ - ../../e2e-tests/src/test/resources/batch-data-generation/batch-customizations.conf:/opt/nussknacker/conf/batch-customizations.conf
+ - ../../e2e-tests/src/test/resources/batch-data-generation/transactions:/transactions
+ - ../../e2e-tests/src/test/resources/batch-data-generation/tables-definition.sql:/opt/nussknacker/conf/tables-definition.sql
diff --git a/e2e-tests/src/test/resources/bootstrap-setup-scenarios.override.yml b/e2e-tests/src/test/resources/bootstrap-setup-scenarios.override.yml
new file mode 100644
index 00000000000..6e1c1f81ab0
--- /dev/null
+++ b/e2e-tests/src/test/resources/bootstrap-setup-scenarios.override.yml
@@ -0,0 +1,6 @@
+services:
+
+ bootstrap-setup:
+ volumes:
+ - ../../e2e-tests/src/test/resources/detect-large-transactions:/scenario-examples/detect-large-transactions
+ - ../../e2e-tests/src/test/resources/determine-offered-plan:/scenario-examples/determine-offered-plan
diff --git a/e2e-tests/src/test/resources/spec-setup/debuggable-nu-designer.override.yml b/e2e-tests/src/test/resources/debuggable-nu-designer.override.yml
similarity index 100%
rename from e2e-tests/src/test/resources/spec-setup/debuggable-nu-designer.override.yml
rename to e2e-tests/src/test/resources/debuggable-nu-designer.override.yml
diff --git a/e2e-tests/src/test/resources/spec-setup/data/nu/scenarios/DetectLargeTransactions.json b/e2e-tests/src/test/resources/detect-large-transactions/DetectLargeTransactions.json
similarity index 100%
rename from e2e-tests/src/test/resources/spec-setup/data/nu/scenarios/DetectLargeTransactions.json
rename to e2e-tests/src/test/resources/detect-large-transactions/DetectLargeTransactions.json
diff --git a/e2e-tests/src/test/resources/detect-large-transactions/setup/kafka/topics.txt b/e2e-tests/src/test/resources/detect-large-transactions/setup/kafka/topics.txt
new file mode 100644
index 00000000000..8efc1cf5ea0
--- /dev/null
+++ b/e2e-tests/src/test/resources/detect-large-transactions/setup/kafka/topics.txt
@@ -0,0 +1,2 @@
+ProcessedTransactions
+Transactions
diff --git a/e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/ProcessedTransactions.schema.json b/e2e-tests/src/test/resources/detect-large-transactions/setup/schema-registry/ProcessedTransactions.schema.json
similarity index 100%
rename from e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/ProcessedTransactions.schema.json
rename to e2e-tests/src/test/resources/detect-large-transactions/setup/schema-registry/ProcessedTransactions.schema.json
diff --git a/e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/Transactions.schema.json b/e2e-tests/src/test/resources/detect-large-transactions/setup/schema-registry/Transactions.schema.json
similarity index 100%
rename from e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/Transactions.schema.json
rename to e2e-tests/src/test/resources/detect-large-transactions/setup/schema-registry/Transactions.schema.json
diff --git a/e2e-tests/src/test/resources/spec-setup/data/nu/scenarios/DetermineOfferedPlan.json b/e2e-tests/src/test/resources/determine-offered-plan/DetermineOfferedPlan.json
similarity index 100%
rename from e2e-tests/src/test/resources/spec-setup/data/nu/scenarios/DetermineOfferedPlan.json
rename to e2e-tests/src/test/resources/determine-offered-plan/DetermineOfferedPlan.json
diff --git a/e2e-tests/src/test/resources/determine-offered-plan/setup/kafka/topics.txt b/e2e-tests/src/test/resources/determine-offered-plan/setup/kafka/topics.txt
new file mode 100644
index 00000000000..2fe924ad72f
--- /dev/null
+++ b/e2e-tests/src/test/resources/determine-offered-plan/setup/kafka/topics.txt
@@ -0,0 +1,2 @@
+Customers
+SmsesWithOffer
diff --git a/e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/Customers.schema.json b/e2e-tests/src/test/resources/determine-offered-plan/setup/schema-registry/Customers.schema.json
similarity index 100%
rename from e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/Customers.schema.json
rename to e2e-tests/src/test/resources/determine-offered-plan/setup/schema-registry/Customers.schema.json
diff --git a/e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/SmsesWithOffer.schema.json b/e2e-tests/src/test/resources/determine-offered-plan/setup/schema-registry/SmsesWithOffer.schema.json
similarity index 100%
rename from e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/SmsesWithOffer.schema.json
rename to e2e-tests/src/test/resources/determine-offered-plan/setup/schema-registry/SmsesWithOffer.schema.json
diff --git a/e2e-tests/src/test/resources/spec-setup/Dockerfile b/e2e-tests/src/test/resources/spec-setup/Dockerfile
deleted file mode 100644
index dbdb4a32f3b..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/Dockerfile
+++ /dev/null
@@ -1,15 +0,0 @@
-FROM bitnami/kafka:3.7.0
-
-USER root
-
-RUN apt update && \
- apt install -y curl jq uuid-runtime kafkacat && \
- rm -rf /var/lib/apt/lists/*
-
-WORKDIR /app
-
-COPY entrypoint.sh /
-COPY scripts/ /app/scripts/
-COPY data/ /app/data/
-
-ENTRYPOINT [ "/entrypoint.sh" ]
diff --git a/e2e-tests/src/test/resources/spec-setup/batch-nu-designer.override.yml b/e2e-tests/src/test/resources/spec-setup/batch-nu-designer.override.yml
deleted file mode 100644
index 16deafd9cad..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/batch-nu-designer.override.yml
+++ /dev/null
@@ -1,10 +0,0 @@
-services:
-
- designer:
- environment:
- CONFIG_FILE: "/opt/nussknacker/conf/application.conf,/opt/nussknacker/conf/application-customizations.conf,/opt/nussknacker/conf/batch-customizations.conf"
- TABLES_DEFINITION_FILE: "/opt/nussknacker/conf/tables-definition.sql"
- volumes:
- - ../../e2e-tests/src/test/resources/spec-setup/batch-config/batch-customizations.conf:/opt/nussknacker/conf/batch-customizations.conf
- - ../../e2e-tests/src/test/resources/spec-setup/batch-config/transactions:/transactions
- - ../../e2e-tests/src/test/resources/spec-setup/batch-config/tables-definition.sql:/opt/nussknacker/conf/tables-definition.sql
diff --git a/e2e-tests/src/test/resources/spec-setup/data/kafka/messages/transactions.txt b/e2e-tests/src/test/resources/spec-setup/data/kafka/messages/transactions.txt
deleted file mode 100644
index 91adff0941d..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/data/kafka/messages/transactions.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-# Example messages below (message per line)
-# { "one": 1, "two": 2 }
-# { "one": 1, "three": 3 }
diff --git a/e2e-tests/src/test/resources/spec-setup/data/kafka/topics.txt b/e2e-tests/src/test/resources/spec-setup/data/kafka/topics.txt
deleted file mode 100644
index 803c44669cf..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/data/kafka/topics.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-# Example topic names below (topic name per line)
-Customers
-ProcessedTransactions
-SmsesWithOffer
-Transactions
diff --git a/e2e-tests/src/test/resources/spec-setup/data/nu/examples.txt b/e2e-tests/src/test/resources/spec-setup/data/nu/examples.txt
deleted file mode 100644
index 2e592de900c..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/data/nu/examples.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-# Scenarios to put as examples (file with scenario from the scenarios folder per line)
-DetectLargeTransactions.json
-DetermineOfferedPlan.json
diff --git a/e2e-tests/src/test/resources/spec-setup/data/schema-registry/active-schemas.txt b/e2e-tests/src/test/resources/spec-setup/data/schema-registry/active-schemas.txt
deleted file mode 100644
index ac31c52c373..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/data/schema-registry/active-schemas.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-# JSON schemas to load (JSON schema file name from the schemas folder per line)
-Customers.schema.json
-ProcessedTransactions.schema.json
-SmsesWithOffer.schema.json
-Transactions.schema.json
diff --git a/e2e-tests/src/test/resources/spec-setup/entrypoint.sh b/e2e-tests/src/test/resources/spec-setup/entrypoint.sh
deleted file mode 100755
index c0127828bf7..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/entrypoint.sh
+++ /dev/null
@@ -1,10 +0,0 @@
-#!/bin/bash -e
-
-# setup other containers using auto-executed scripts
-while IFS= read -r script; do
- "$script"
-done < <(find /app/scripts/auto-executed -type f -name '*.sh' | sort)
-
-echo "Setup done!"
-# loop forever (you can use manually called utils scripts now)
-tail -f /dev/null
diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/001-setup-schemas.sh b/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/001-setup-schemas.sh
deleted file mode 100755
index 1ced04b8679..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/001-setup-schemas.sh
+++ /dev/null
@@ -1,58 +0,0 @@
-#!/bin/bash -e
-
-cd "$(dirname "$0")"
-
-function createJsonSchema() {
- if [ "$#" -ne 2 ]; then
- echo "Error: Two parameters required: 1) schema name, 2) schema file path"
- exit 11
- fi
-
- set -e
-
- local SCHEMA_NAME=$1
- local SCHEMA_FILE=$2
-
- echo "Creating schema '$SCHEMA_NAME' ..."
- ESCAPED_JSON_SCHEMA=$(awk 'BEGIN{ORS="\\n"} {gsub(/"/, "\\\"")} 1' < "$SCHEMA_FILE")
-
- local REQUEST_BODY="{
- \"schema\": \"$ESCAPED_JSON_SCHEMA\",
- \"schemaType\": \"JSON\",
- \"references\": []
- }"
-
- local RESPONSE
- RESPONSE=$(curl -s -L -w "\n%{http_code}" -u admin:admin \
- -X POST "http://schema-registry:8081/subjects/${SCHEMA_NAME}/versions" \
- -H "Content-Type: application/vnd.schemaregistry.v1+json" -d "$REQUEST_BODY"
- )
-
- local HTTP_STATUS
- HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1)
-
- if [[ "$HTTP_STATUS" != 200 ]] ; then
- local RESPONSE_BODY
- RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d)
- echo -e "Error: Cannot create schema $SCHEMA_NAME.\nHTTP status: $HTTP_STATUS, response body: $RESPONSE_BODY"
- exit 12
- fi
-
- echo "Schema '$SCHEMA_NAME' created!"
-}
-
-echo "Starting to add preconfigured schemas ..."
-
-while IFS= read -r SCHEMA_FILENAME; do
-
- if [[ $SCHEMA_FILENAME == "#"* ]]; then
- continue
- fi
-
- SCHEMA_NAME="$(basename "$SCHEMA_FILENAME" ".schema.json")-value"
- createJsonSchema "$SCHEMA_NAME" "$(realpath ../../data/schema-registry/schemas/"$SCHEMA_FILENAME")"
-
-done < "../../data/schema-registry/active-schemas.txt"
-
-
-echo "DONE!"
diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/002-setup-topics.sh b/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/002-setup-topics.sh
deleted file mode 100755
index dc44807d46d..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/002-setup-topics.sh
+++ /dev/null
@@ -1,31 +0,0 @@
-#!/bin/bash -e
-
-cd "$(dirname "$0")"
-
-function createTopic() {
- if [ "$#" -ne 1 ]; then
- echo "Error: One parameter required: 1) topic name"
- exit 11
- fi
-
- set -e
- local TOPIC_NAME=$1
-
- echo "Creating topic '$TOPIC_NAME'"
- /opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka:9092 --topic "$TOPIC_NAME"
- echo "Creating topic '$TOPIC_NAME'"
-}
-
-echo "Starting to create preconfigured topics ..."
-
-while IFS= read -r TOPIC_NAME; do
-
- if [[ $TOPIC_NAME == "#"* ]]; then
- continue
- fi
-
- createTopic "$TOPIC_NAME"
-
-done < "../../data/kafka/topics.txt"
-
-echo "DONE!"
diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/003-import-and-deploy-example-scenarios.sh b/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/003-import-and-deploy-example-scenarios.sh
deleted file mode 100755
index a583cc18ff5..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/003-import-and-deploy-example-scenarios.sh
+++ /dev/null
@@ -1,34 +0,0 @@
-#!/bin/bash -e
-
-cd "$(dirname "$0")"
-
-function importAndDeployScenario() {
- if [ "$#" -ne 2 ]; then
- echo "Error: Two parameters required: 1) scenario name, 2) example scenario file path"
- exit 11
- fi
-
- set -e
-
- local EXAMPLE_SCENARIO_NAME=$1
- local EXAMPLE_SCENARIO_FILE=$2
-
- ../utils/nu/load-scenario-from-json-file.sh "$EXAMPLE_SCENARIO_NAME" "$EXAMPLE_SCENARIO_FILE"
- ../utils/nu/deploy-scenario-and-wait-for-running-state.sh "$EXAMPLE_SCENARIO_NAME"
-}
-
-echo "Starting to import and deploy example scenarios ..."
-
-while IFS= read -r EXAMPLE_SCENARIO_FILENAME; do
-
- if [[ $EXAMPLE_SCENARIO_FILENAME == "#"* ]]; then
- continue
- fi
-
- EXAMPLE_SCENARIO_NAME=$(basename "$EXAMPLE_SCENARIO_FILENAME" ".json")
-
- importAndDeployScenario "$EXAMPLE_SCENARIO_NAME" "$(realpath ../../data/nu/scenarios/"$EXAMPLE_SCENARIO_FILENAME")"
-
-done < "../../data/nu/examples.txt"
-
-echo "DONE!"
diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/004-send-kafka-messages.sh b/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/004-send-kafka-messages.sh
deleted file mode 100755
index cb22d3cf61e..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/004-send-kafka-messages.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/bin/bash -e
-
-cd "$(dirname "$0")"
-
-function sendMessage() {
- if [ "$#" -ne 2 ]; then
- echo "Error: Two parameters required: 1) topic name, 2) message"
- exit 11
- fi
-
- set -e
-
- local TOPIC_NAME=$1
- local MSG=$2
-
- echo "Sending message $MSG to '$TOPIC_NAME'"
- ../utils/kafka/send-to-topic.sh "$TOPIC_NAME" "$MSG"
- echo "Message sent!"
-}
-
-echo "Starting to send preconfigured messages ..."
-
-for FILE in "../../data/kafka/messages"/*; do
- if [ -f "$FILE" ]; then
- TOPIC_NAME=$(basename "$FILE")
-
- while IFS= read -r MSG; do
- if [[ $MSG == "#"* ]]; then
- continue
- fi
-
- sendMessage "$TOPIC_NAME" "$MSG"
- done < "$FILE"
- fi
-done
-
-echo "DONE!"
diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/purge-topic.sh b/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/purge-topic.sh
deleted file mode 100755
index c925330e46b..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/purge-topic.sh
+++ /dev/null
@@ -1,17 +0,0 @@
-#!/bin/bash -e
-
-if [ "$#" -ne 1 ]; then
- echo "One parameter required: 1) topic name"
- exit 1
-fi
-
-cd "$(dirname "$0")"
-
-TOPIC_NAME=$1
-DELETE_TOPIC_ORDER_FILE="/tmp/delete-$TOPIC_NAME.json"
-
-trap 'rm "$DELETE_TOPIC_ORDER_FILE"' EXIT
-
-echo "{ \"partitions\": [{ \"topic\": \"$TOPIC_NAME\", \"partition\": 0, \"offset\": -1 }], \"version\": 1 }" > "$DELETE_TOPIC_ORDER_FILE"
-
-/opt/bitnami/kafka/bin/kafka-delete-records.sh --bootstrap-server kafka:9092 -offset-json-file "$DELETE_TOPIC_ORDER_FILE"
diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/read-from-topic.sh b/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/read-from-topic.sh
deleted file mode 100755
index 7501eb071d6..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/read-from-topic.sh
+++ /dev/null
@@ -1,12 +0,0 @@
-#!/bin/bash -e
-
-if [ "$#" -ne 1 ]; then
- echo "One parameter required: 1) topic name"
- exit 1
-fi
-
-cd "$(dirname "$0")"
-
-TOPIC_NAME=$1
-
-kcat -C -b kafka:9092 -t "$TOPIC_NAME" -o beginning -e -q
diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/send-to-topic.sh b/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/send-to-topic.sh
deleted file mode 100755
index 0d387168b38..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/send-to-topic.sh
+++ /dev/null
@@ -1,13 +0,0 @@
-#!/bin/bash -e
-
-if [ "$#" -ne 2 ]; then
- echo "Two parameters required: 1) topic name, 2) message"
- exit 1
-fi
-
-cd "$(dirname "$0")"
-
-TOPIC=$1
-MESSAGE=$2
-
-echo "$MESSAGE" | /opt/bitnami/kafka/bin/kafka-console-producer.sh --topic "$TOPIC" --bootstrap-server kafka:9092
diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/deploy-scenario-and-wait-for-running-state.sh b/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/deploy-scenario-and-wait-for-running-state.sh
deleted file mode 100755
index 17f3a67ce9b..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/deploy-scenario-and-wait-for-running-state.sh
+++ /dev/null
@@ -1,96 +0,0 @@
-#!/bin/bash -e
-
-cd "$(dirname "$0")"
-
-if [ "$#" -lt 1 ]; then
- echo "Error: One parameter required: 1) scenario name"
- exit 1
-fi
-
-SCENARIO_NAME=$1
-TIMEOUT_SECONDS=${2:-60}
-WAIT_INTERVAL=5
-
-function deployScenario() {
- if [ "$#" -ne 1 ]; then
- echo "Error: One parameter required: 1) scenario name"
- exit 11
- fi
-
- set -e
-
- local SCENARIO_NAME=$1
-
- local RESPONSE
- RESPONSE=$(curl -s -L -w "\n%{http_code}" -u admin:admin \
- -X POST "http://nginx:8080/api/processManagement/deploy/$SCENARIO_NAME"
- )
-
- local HTTP_STATUS
- HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1)
-
- if [ "$HTTP_STATUS" != "200" ]; then
- local RESPONSE_BODY
- RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d)
- echo -e "Error: Cannot run scenario $SCENARIO_NAME deployment.\nHTTP status: $HTTP_STATUS, response body: $RESPONSE_BODY"
- exit 12
- fi
-
- echo "Scenario $SCENARIO_NAME deployment started ..."
-}
-
-function checkDeploymentStatus() {
- if [ "$#" -ne 1 ]; then
- echo "Error: One parameter required: 1) scenario name"
- exit 21
- fi
-
- set -e
-
- local SCENARIO_NAME=$1
-
- local RESPONSE
- RESPONSE=$(curl -s -L -w "\n%{http_code}" -u admin:admin \
- -X GET "http://nginx:8080/api/processes/$SCENARIO_NAME/status"
- )
-
- local HTTP_STATUS
- HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1)
- local RESPONSE_BODY
- RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d)
-
- if [ "$HTTP_STATUS" != "200" ]; then
- echo -e "Error: Cannot check scenario $SCENARIO_NAME deployment status.\nHTTP status: $HTTP_STATUS, response body: $RESPONSE_BODY"
- exit 22
- fi
-
- local SCENARIO_STATUS
- SCENARIO_STATUS=$(echo "$RESPONSE_BODY" | jq -r '.status.name')
- echo "$SCENARIO_STATUS"
-}
-
-echo "Deploying scenario $SCENARIO_NAME ..."
-
-START_TIME=$(date +%s)
-END_TIME=$((START_TIME + TIMEOUT_SECONDS))
-
-deployScenario "$SCENARIO_NAME"
-
-while true; do
- DEPLOYMENT_STATUS=$(checkDeploymentStatus "$SCENARIO_NAME")
-
- if [ "$DEPLOYMENT_STATUS" == "RUNNING" ]; then
- break
- fi
-
- CURRENT_TIME=$(date +%s)
- if [ $CURRENT_TIME -gt $END_TIME ]; then
- echo "Error: Timeout for waiting for the RUNNING state of $SCENARIO_NAME deployment reached!"
- exit 2
- fi
-
- echo "$SCENARIO_NAME deployment state is $DEPLOYMENT_STATUS. Checking again in $WAIT_INTERVAL seconds..."
- sleep $WAIT_INTERVAL
-done
-
-echo "Scenario $SCENARIO_NAME is RUNNING!"
diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json-file.sh b/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json-file.sh
deleted file mode 100755
index 03bb3276777..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json-file.sh
+++ /dev/null
@@ -1,150 +0,0 @@
-#!/bin/bash -e
-
-cd "$(dirname "$0")"
-
-if [ "$#" -lt 2 ]; then
- echo "Error: Two parameters required: 1) scenario name, 2) scenario file path"
- exit 1
-fi
-
-SCENARIO_NAME=$1
-SCENARIO_FILE_PATH=$2
-CATEGORY=${3:-"Default"}
-
-if [ ! -f "$SCENARIO_FILE_PATH" ]; then
- echo "Error: Cannot find file $SCENARIO_FILE_PATH with scenario"
- exit 2
-fi
-
-function createEmptyScenario() {
- if [ "$#" -ne 4 ]; then
- echo "Error: Four parameters required: 1) scenario name, 2) processing mode, 3) category, 4) engine"
- exit 11
- fi
-
- set -e
-
- local SCENARIO_NAME=$1
- local PROCESSING_MODE=$2
- local CATEGORY=$3
- local ENGINE=$4
-
- local REQUEST_BODY="{
- \"name\": \"$SCENARIO_NAME\",
- \"processingMode\": \"$PROCESSING_MODE\",
- \"category\": \"$CATEGORY\",
- \"engineSetupName\": \"$ENGINE\",
- \"isFragment\": false
- }"
-
- local RESPONSE
- RESPONSE=$(curl -s -L -w "\n%{http_code}" -u admin:admin \
- -X POST "http://nginx:8080/api/processes" \
- -H "Content-Type: application/json" -d "$REQUEST_BODY"
- )
-
- local HTTP_STATUS
- HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1)
-
- if [ "$HTTP_STATUS" != "201" ]; then
- local RESPONSE_BODY
- RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d)
- echo -e "Error: Cannot create empty scenario $SCENARIO_NAME.\nHTTP status: $HTTP_STATUS, response body: $RESPONSE_BODY"
- exit 12
- fi
-
- echo "Empty scenario $SCENARIO_NAME created successfully."
-}
-
-function importScenarioFromFile() {
- if [ "$#" -ne 2 ]; then
- echo "Error: Two parameters required: 1) scenario name, 2) scenario file path"
- exit 21
- fi
-
- set -e
-
- local SCENARIO_NAME=$1
- local SCENARIO_FILE=$2
-
- local RESPONSE
- RESPONSE=$(curl -s -L -w "\n%{http_code}" -u admin:admin \
- -X POST "http://nginx:8080/api/processes/import/$SCENARIO_NAME" \
- -F "process=@$SCENARIO_FILE"
- )
-
- # Check response body and status code
- local HTTP_STATUS
- HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1)
-
- local RESPONSE_BODY
- RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d)
-
- if [ "$HTTP_STATUS" == "200" ]; then
- local SCENARIO_GRAPH
- SCENARIO_GRAPH=$(echo "$RESPONSE_BODY" | jq '.scenarioGraph')
- echo "$SCENARIO_GRAPH"
- else
- echo -e "Error: Cannot import scenario $SCENARIO_NAME.\nHTTP status: $HTTP_STATUS, response body: $RESPONSE_BODY"
- exit 22
- fi
-}
-
-function saveScenario() {
- if [ "$#" -ne 2 ]; then
- echo "Error: Two parameters required: 1) scenario name, 2) scenario graph JSON representation"
- exit 31
- fi
-
- set -e
-
- local SCENARIO_NAME=$1
- local SCENARIO_GRAPH_JSON=$2
-
- local REQUEST_BODY="{
- \"scenarioGraph\": $SCENARIO_GRAPH_JSON,
- \"comment\": \"\"
- }"
-
- local RESPONSE
- RESPONSE=$(curl -s -L -w "\n%{http_code}" -u admin:admin \
- -X PUT "http://nginx:8080/api/processes/$SCENARIO_NAME" \
- -H "Content-Type: application/json" -d "$REQUEST_BODY"
- )
-
- local HTTP_STATUS
- HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1)
-
- if [ "$HTTP_STATUS" != "200" ]; then
- local RESPONSE_BODY
- RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d)
- echo -e "Error: Cannot save scenario $SCENARIO_NAME.\nHTTP status: $HTTP_STATUS, response body: $RESPONSE_BODY"
- exit 32
- fi
-
- echo "Scenario $SCENARIO_NAME saved successfully."
-}
-
-META_DATA_TYPE=$(jq -r .metaData.additionalFields.metaDataType < "$SCENARIO_FILE_PATH")
-case "$META_DATA_TYPE" in
- "StreamMetaData")
- ENGINE="Flink"
- PROCESSING_MODE="Unbounded-Stream"
- ;;
- "LiteStreamMetaData")
- ENGINE="Lite K8s"
- PROCESSING_MODE="Unbounded-Stream"
- ;;
- "RequestResponseMetaData")
- ENGINE="Lite K8s"
- PROCESSING_MODE="Request-Response"
- ;;
- *)
- echo "Error: Cannot import scenario with metadata type: $META_DATA_TYPE"
- exit 2
- ;;
-esac
-
-createEmptyScenario "$SCENARIO_NAME" "$PROCESSING_MODE" "$CATEGORY" "$ENGINE"
-SCENARIO_GRAPH=$(importScenarioFromFile "$SCENARIO_NAME" "$SCENARIO_FILE_PATH")
-saveScenario "$SCENARIO_NAME" "$SCENARIO_GRAPH"
diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json.sh b/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json.sh
deleted file mode 100755
index 8e80c5e75e9..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json.sh
+++ /dev/null
@@ -1,17 +0,0 @@
-#!/bin/bash -e
-
-cd "$(dirname "$0")"
-
-if [ "$#" -lt 2 ]; then
- echo "Error: Two parameters required: 1) scenario name, 2) scenario JSON"
- exit 1
-fi
-
-SCENARIO_NAME=$1
-SCENARIO_JSON=$2
-SCENARIO_JSON_FILE="/tmp/scenario-$SCENARIO_NAME.json"
-
-echo "$SCENARIO_JSON" > "$SCENARIO_JSON_FILE"
-trap 'rm "$SCENARIO_JSON_FILE"' EXIT
-
-./load-scenario-from-json-file.sh "$SCENARIO_NAME" "$SCENARIO_JSON_FILE"
diff --git a/e2e-tests/src/test/resources/spec-setup/spec-setup.override.yml b/e2e-tests/src/test/resources/spec-setup/spec-setup.override.yml
deleted file mode 100644
index 0be16dc9ec8..00000000000
--- a/e2e-tests/src/test/resources/spec-setup/spec-setup.override.yml
+++ /dev/null
@@ -1,9 +0,0 @@
-services:
-
- spec-setup:
- build:
- context: ../../e2e-tests/src/test/resources/spec-setup/
- dockerfile: Dockerfile
- depends_on:
- nginx:
- condition: service_healthy
diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/BaseE2eSpec.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/BaseE2eSpec.scala
new file mode 100644
index 00000000000..d4e1490639c
--- /dev/null
+++ b/e2e-tests/src/test/scala/pl/touk/nussknacker/BaseE2eSpec.scala
@@ -0,0 +1,36 @@
+package pl.touk.nussknacker
+
+import better.files._
+import com.typesafe.scalalogging.LazyLogging
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}
+import pl.touk.nussknacker.engine.version.BuildInfo
+import pl.touk.nussknacker.test.installationexample.{
+ DockerBasedInstallationExampleClient,
+ DockerBasedInstallationExampleNuEnvironment
+}
+
+import java.io.{File => JFile}
+
+// Before running tests in this module, a fresh docker image should be built from sources and placed in the local
+// registry. If you run tests based on this trait in Intellij Idea and the images is not built, you can do it manually:
+// `bash -c "export NUSSKNACKER_SCALA_VERSION=2.12 && sbt dist/Docker/publishLocal"`
+trait BaseE2eSpec extends BeforeAndAfterAll with BeforeAndAfterEach with LazyLogging {
+ this: Suite =>
+
+ val client: DockerBasedInstallationExampleClient =
+ BaseE2eSpec.dockerBasedInstallationExampleNuEnvironmentSingleton.client
+}
+
+object BaseE2eSpec extends LazyLogging {
+
+ val dockerBasedInstallationExampleNuEnvironmentSingleton =
+ new DockerBasedInstallationExampleNuEnvironment(
+ nussknackerImageVersion = BuildInfo.version,
+ dockerComposeTweakFiles = List(
+ new JFile(Resource.getUrl("bootstrap-setup-scenarios.override.yml").toURI),
+ new JFile(Resource.getUrl("batch-nu-designer.override.yml").toURI),
+ new JFile(Resource.getUrl("debuggable-nu-designer.override.yml").toURI)
+ )
+ )
+
+}
diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala
index e7b3fc5b299..3017ec0afcc 100644
--- a/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala
+++ b/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala
@@ -13,7 +13,7 @@ import pl.touk.nussknacker.ui.process.marshall.CanonicalProcessConverter.toScena
class BatchDataGenerationSpec
extends AnyFreeSpecLike
- with DockerBasedInstallationExampleNuEnvironment
+ with BaseE2eSpec
with Matchers
with VeryPatientScalaFutures
with NuRestAssureExtensions
diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/DetectLargeTransactionSpec.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/DetectLargeTransactionSpec.scala
index d987ba913b0..e0758e4ddcc 100644
--- a/e2e-tests/src/test/scala/pl/touk/nussknacker/DetectLargeTransactionSpec.scala
+++ b/e2e-tests/src/test/scala/pl/touk/nussknacker/DetectLargeTransactionSpec.scala
@@ -4,11 +4,7 @@ import org.scalatest.freespec.AnyFreeSpecLike
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.test.VeryPatientScalaFutures
-class DetectLargeTransactionSpec
- extends AnyFreeSpecLike
- with DockerBasedInstallationExampleNuEnvironment
- with Matchers
- with VeryPatientScalaFutures {
+class DetectLargeTransactionSpec extends AnyFreeSpecLike with BaseE2eSpec with Matchers with VeryPatientScalaFutures {
"Large transactions should be properly detected" in {
val smallAmountTransactions = List(
@@ -23,18 +19,18 @@ class DetectLargeTransactionSpec
)
(smallAmountTransactions ::: largeAmountTransactions).foreach { transaction =>
- sendMessageToKafka("Transactions", transaction)
+ client.sendMessageToKafka("Transactions", transaction)
}
eventually {
- val processedTransactions = readAllMessagesFromKafka("ProcessedTransactions")
+ val processedTransactions = client.readAllMessagesFromKafka("ProcessedTransactions")
processedTransactions should equal(largeAmountTransactions)
}
}
override protected def afterEach(): Unit = {
- purgeKafkaTopic("Transactions")
- purgeKafkaTopic("ProcessedTransactions")
+ client.purgeKafkaTopic("Transactions")
+ client.purgeKafkaTopic("ProcessedTransactions")
super.afterEach()
}
diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/DetermineOfferedPlanSpec.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/DetermineOfferedPlanSpec.scala
index 49d57696cc8..2d5f0ce92ab 100644
--- a/e2e-tests/src/test/scala/pl/touk/nussknacker/DetermineOfferedPlanSpec.scala
+++ b/e2e-tests/src/test/scala/pl/touk/nussknacker/DetermineOfferedPlanSpec.scala
@@ -4,11 +4,7 @@ import org.scalatest.freespec.AnyFreeSpecLike
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.test.VeryPatientScalaFutures
-class DetermineOfferedPlanSpec
- extends AnyFreeSpecLike
- with DockerBasedInstallationExampleNuEnvironment
- with Matchers
- with VeryPatientScalaFutures {
+class DetermineOfferedPlanSpec extends AnyFreeSpecLike with BaseE2eSpec with Matchers with VeryPatientScalaFutures {
"Properly determine offers for customers" in {
val customers = List(
@@ -19,11 +15,11 @@ class DetermineOfferedPlanSpec
)
customers.foreach { customer =>
- sendMessageToKafka("Customers", customer)
+ client.sendMessageToKafka("Customers", customer)
}
eventually {
- val smses = readAllMessagesFromKafka("SmsesWithOffer")
+ val smses = client.readAllMessagesFromKafka("SmsesWithOffer")
smses should equal(
List(
smsWithOfferJson("Nick", "Junior Package"),
@@ -34,8 +30,8 @@ class DetermineOfferedPlanSpec
}
override protected def afterEach(): Unit = {
- purgeKafkaTopic("Customers")
- purgeKafkaTopic("SmsesWithOffer")
+ client.purgeKafkaTopic("Customers")
+ client.purgeKafkaTopic("SmsesWithOffer")
super.afterEach()
}
diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/DockerBasedInstallationExampleNuEnvironment.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/DockerBasedInstallationExampleNuEnvironment.scala
deleted file mode 100644
index 17311c831f2..00000000000
--- a/e2e-tests/src/test/scala/pl/touk/nussknacker/DockerBasedInstallationExampleNuEnvironment.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-package pl.touk.nussknacker
-
-import better.files._
-import com.dimafeng.testcontainers._
-import com.typesafe.scalalogging.LazyLogging
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}
-import org.testcontainers.containers.output.Slf4jLogConsumer
-import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy
-import pl.touk.nussknacker.test.containers.ContainerExt._
-import pl.touk.nussknacker.DockerBasedInstallationExampleNuEnvironment.{JSON, singletonContainer}
-import ujson.Value
-import pl.touk.nussknacker.engine.version.BuildInfo
-
-import java.io.{File => JFile}
-import java.time.Duration
-
-// Before running tests in this module, a fresh docker image should be built from sources and placed in the local
-// registry. If you run tests based on this trait in Intellij Idea and the images is not built, you can do it manually:
-// `bash -c "export NUSSKNACKER_SCALA_VERSION=2.12 && sbt dist/Docker/publishLocal"`
-trait DockerBasedInstallationExampleNuEnvironment extends BeforeAndAfterAll with BeforeAndAfterEach with LazyLogging {
- this: Suite =>
-
- private val specSetupService = unsafeContainerByServiceName("spec-setup")
-
- def loadFlinkStreamingScenarioFromResource(scenarioName: String, scenarioJsonFile: File): Unit = {
- val escapedScenarioJson = scenarioJsonFile.contentAsString().replaceAll("\"", "\\\\\"")
- specSetupService.executeBash(
- s"""/app/scripts/utils/nu/load-scenario-from-json.sh "$scenarioName" "$escapedScenarioJson" """
- )
- }
-
- def deployAndWaitForRunningState(scenarioName: String): Unit = {
- specSetupService.executeBash(
- s"""/app/scripts/utils/nu/deploy-scenario-and-wait-for-running-state.sh "$scenarioName" """
- )
- }
-
- def sendMessageToKafka(topic: String, message: JSON): Unit = {
- val escapedMessage = message.render().replaceAll("\"", "\\\\\"")
- specSetupService.executeBash(s"""/app/scripts/utils/kafka/send-to-topic.sh "$topic" "$escapedMessage" """)
- }
-
- def readAllMessagesFromKafka(topic: String): List[JSON] = {
- specSetupService
- .executeBashAndReadStdout(s"""/app/scripts/utils/kafka/read-from-topic.sh "$topic" """)
- .split("\n")
- .toList
- .map(ujson.read(_))
- }
-
- def purgeKafkaTopic(topic: String): Unit = {
- specSetupService.executeBash(s"""/app/scripts/utils/kafka/purge-topic.sh "$topic" """)
- }
-
- private def unsafeContainerByServiceName(name: String) = singletonContainer
- .getContainerByServiceName(name)
- .getOrElse(throw new IllegalStateException(s"'$name' service not available!"))
-
-}
-
-object DockerBasedInstallationExampleNuEnvironment extends LazyLogging {
-
- type JSON = Value
-
- val singletonContainer: DockerComposeContainer = new DockerComposeContainer(
- composeFiles = Seq(
- new JFile("examples/installation/docker-compose.yml"),
- new JFile(Resource.getUrl("spec-setup/spec-setup.override.yml").toURI),
- new JFile(Resource.getUrl("spec-setup/batch-nu-designer.override.yml").toURI),
- new JFile(Resource.getUrl("spec-setup/debuggable-nu-designer.override.yml").toURI)
- ),
- env = Map(
- "NUSSKNACKER_VERSION" -> BuildInfo.version
- ),
- logConsumers = Seq(
- ServiceLogConsumer("spec-setup", new Slf4jLogConsumer(logger.underlying))
- ),
- waitingFor = Some(
- WaitingForService(
- "spec-setup",
- new LogMessageWaitStrategy()
- .withRegEx("^Setup done!.*")
- .withStartupTimeout(Duration.ofSeconds(120L))
- )
- ),
- // Change to 'true' to enable logging
- tailChildContainers = false
- )
-
- singletonContainer.start()
-
-}
diff --git a/examples/installation/.env b/examples/installation/.env
deleted file mode 100644
index 35735ab8847..00000000000
--- a/examples/installation/.env
+++ /dev/null
@@ -1 +0,0 @@
-NUSSKNACKER_VERSION=1.15.3
diff --git a/nussknacker-dist/src/universal/conf/logback.xml b/nussknacker-dist/src/universal/conf/logback.xml
index ce23f616eb6..be2d2f4f5c5 100644
--- a/nussknacker-dist/src/universal/conf/logback.xml
+++ b/nussknacker-dist/src/universal/conf/logback.xml
@@ -55,5 +55,6 @@
+
diff --git a/utils/test-utils/src/main/resources/bootstrap-setup.override.yml b/utils/test-utils/src/main/resources/bootstrap-setup.override.yml
new file mode 100644
index 00000000000..c789679792f
--- /dev/null
+++ b/utils/test-utils/src/main/resources/bootstrap-setup.override.yml
@@ -0,0 +1,24 @@
+services:
+
+ bootstrap-setup:
+ image: touk/nussknacker-example-scenarios-library:0.3.0
+ depends_on:
+ nginx:
+ condition: service_healthy
+ designer:
+ condition: service_healthy
+ kafka:
+ condition: service_healthy
+ schema-registry:
+ condition: service_healthy
+ environment:
+ DISABLE_EMBEDDED_EXAMPLES: true
+ NU_DESIGNER_ADDRESS: "nginx:8080"
+ NU_REQUEST_RESPONSE_OPEN_API_SERVICE_ADDRESS: "nginx:8181"
+ KAFKA_ADDRESS: "kafka:9092"
+ SCHEMA_REGISTRY_ADDRESS: "schema-registry:8081"
+ deploy:
+ resources:
+ limits:
+ memory: 256M
+ cpus: '0.5'
diff --git a/utils/test-utils/src/main/resources/logback-test.xml b/utils/test-utils/src/main/resources/logback-test.xml
index b36cf2e7ef1..2407b450eb6 100644
--- a/utils/test-utils/src/main/resources/logback-test.xml
+++ b/utils/test-utils/src/main/resources/logback-test.xml
@@ -63,6 +63,6 @@
-
+
diff --git a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/MiscUtils.scala b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/MiscUtils.scala
new file mode 100644
index 00000000000..bc9972034d0
--- /dev/null
+++ b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/MiscUtils.scala
@@ -0,0 +1,21 @@
+package pl.touk.nussknacker.test
+
+import org.apache.commons.io.IOUtils
+
+import java.io.{File, FileOutputStream, InputStream}
+
+object MiscUtils {
+
+ implicit class InputStreamOps(val in: InputStream) extends AnyVal {
+
+ def toFile: File = {
+ val tempFile = File.createTempFile("Nussknacker", null)
+ tempFile.deleteOnExit()
+ val out = new FileOutputStream(tempFile);
+ IOUtils.copy(in, out);
+ tempFile
+ }
+
+ }
+
+}
diff --git a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/containers/ContainerExt.scala b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/containers/ContainerExt.scala
index 96324f4e95f..f6b90cb1ad0 100644
--- a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/containers/ContainerExt.scala
+++ b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/containers/ContainerExt.scala
@@ -8,11 +8,11 @@ import scala.language.implicitConversions
class ContainerExt(val container: ContainerState) extends LazyLogging {
def executeBash(cmd: String): Unit = {
- logger.info(executeBashAndReadStdout(cmd))
+ executeBashAndReadStdout(cmd)
}
def executeBashAndReadStdout(cmd: String): String = {
- logger.info(s"Calling command '$cmd' on container '${container.getContainerInfo.getName}' ...")
+ logger.debug(s"Calling command '$cmd' on container '${container.getContainerInfo.getName}' ...")
val exitResult = container.execInContainer("bash", "-c", cmd)
exitResult.getExitCode match {
case 0 =>
diff --git a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/installationexample/DockerBasedNuInstallationExampleEnvironment.scala b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/installationexample/DockerBasedNuInstallationExampleEnvironment.scala
new file mode 100644
index 00000000000..53a694e4841
--- /dev/null
+++ b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/installationexample/DockerBasedNuInstallationExampleEnvironment.scala
@@ -0,0 +1,90 @@
+package pl.touk.nussknacker.test.installationexample
+
+import com.dimafeng.testcontainers.{DockerComposeContainer, ServiceLogConsumer, WaitingForService}
+import com.typesafe.scalalogging.LazyLogging
+import org.slf4j.Logger
+import org.testcontainers.containers.output.Slf4jLogConsumer
+import org.testcontainers.containers.wait.strategy.DockerHealthcheckWaitStrategy
+import pl.touk.nussknacker.test.containers.ContainerExt.toContainerExt
+import pl.touk.nussknacker.test.installationexample.DockerBasedInstallationExampleNuEnvironment.{JSON, slf4jLogger}
+import ujson.Value
+import pl.touk.nussknacker.test.MiscUtils._
+
+import java.io.{File => JFile}
+import java.time.Duration
+
+class DockerBasedInstallationExampleNuEnvironment(
+ nussknackerImageVersion: String,
+ dockerComposeTweakFiles: Iterable[JFile]
+) extends DockerComposeContainer(
+ composeFiles = new JFile("examples/installation/docker-compose.yml") ::
+ DockerBasedInstallationExampleNuEnvironment.getClass
+ .getResourceAsStream("/bootstrap-setup.override.yml")
+ .toFile ::
+ dockerComposeTweakFiles.toList,
+ env = Map(
+ "NUSSKNACKER_VERSION" -> nussknackerImageVersion
+ ),
+ logConsumers = Seq(
+ ServiceLogConsumer("bootstrap-setup", new Slf4jLogConsumer(slf4jLogger))
+ ),
+ waitingFor = Some(
+ WaitingForService("bootstrap-setup", new DockerHealthcheckWaitStrategy().withStartupTimeout(Duration.ofSeconds(120)))
+ ),
+ // Change to 'true' to enable logging
+ tailChildContainers = false
+ ) {
+
+ start()
+
+ val client: DockerBasedInstallationExampleClient = new DockerBasedInstallationExampleClient(this)
+}
+
+object DockerBasedInstallationExampleNuEnvironment extends LazyLogging {
+
+ type JSON = Value
+
+ private def slf4jLogger: Logger = logger.underlying
+
+}
+
+class DockerBasedInstallationExampleClient(env: DockerBasedInstallationExampleNuEnvironment) {
+
+ private val bootstrapSetupService = unsafeContainerByServiceName("bootstrap-setup")
+
+ def deployAndWaitForRunningState(scenarioName: String): Unit = {
+ bootstrapSetupService.executeBash(
+ s"""/app/utils/nu/deploy-scenario-and-wait-for-running-state.sh "$scenarioName" """
+ )
+ }
+
+ def sendMessageToKafka(topic: String, message: JSON): Unit = {
+ sendMessagesToKafka(topic, message :: Nil)
+ }
+
+ def sendMessagesToKafka(topic: String, messages: Iterable[JSON]): Unit = {
+ val escapedMessages = messages.map(_.render().replaceAll("\"", "\\\\\"")).mkString("\n")
+ bootstrapSetupService.executeBash(s"""/app/utils/kafka/send-to-topic.sh "$topic" "$escapedMessages" """)
+ }
+
+ def readAllMessagesFromKafka(topic: String): List[JSON] = {
+ bootstrapSetupService
+ .executeBashAndReadStdout(s"""/app/utils/kafka/read-from-topic.sh "$topic" """)
+ .split("\n")
+ .flatMap {
+ case "" => None
+ case str => Option(str)
+ }
+ .toList
+ .map(ujson.read(_))
+ }
+
+ def purgeKafkaTopic(topic: String): Unit = {
+ bootstrapSetupService.executeBash(s"""/app/utils/kafka/purge-topic.sh "$topic" """)
+ }
+
+ private def unsafeContainerByServiceName(name: String) = env
+ .getContainerByServiceName(name)
+ .getOrElse(throw new IllegalStateException(s"'$name' service not available!"))
+
+}