From 8fd58c7729224d1f7d6f880d78083d9b269ead59 Mon Sep 17 00:00:00 2001 From: mkl <158037696+mk-software-pl@users.noreply.github.com> Date: Fri, 6 Sep 2024 11:26:18 +0200 Subject: [PATCH] [NU-1734] E2E Flink Streaming benchmark moved from Quickstart repo (#6777) --- .github/workflows/benchmark-e2e.yml | 24 +++ .../{benchmark.yml => benchmark-jmh.yml} | 7 +- .../resources/benchmark-setup.override.yml | 17 ++ .../DetectLargeTransactions.json | 202 ++++++++++++++++++ .../benchmark-flink/setup/kafka/topics.txt | 3 + .../setup/schema-registry/alerts.schema.json | 10 + .../processedEvents.schema.json | 11 + .../schema-registry/transactions.schema.json | 11 + .../benchmarks/e2e/BaseE2EBenchmark.scala | 31 +++ .../e2e/FlinkSteamingScenarioBenchmark.scala | 115 ++++++++++ build.sbt | 62 ++++-- .../batch-customizations.conf | 0 .../tables-definition.sql | 0 .../transactions/transactions.ndjson | 0 .../resources/batch-nu-designer.override.yml | 10 + .../bootstrap-setup-scenarios.override.yml | 6 + .../debuggable-nu-designer.override.yml | 0 .../DetectLargeTransactions.json | 0 .../setup/kafka/topics.txt | 2 + .../ProcessedTransactions.schema.json | 0 .../schema-registry}/Transactions.schema.json | 0 .../DetermineOfferedPlan.json | 0 .../setup/kafka/topics.txt | 2 + .../schema-registry}/Customers.schema.json | 0 .../SmsesWithOffer.schema.json | 0 .../src/test/resources/spec-setup/Dockerfile | 15 -- .../spec-setup/batch-nu-designer.override.yml | 10 - .../data/kafka/messages/transactions.txt | 3 - .../spec-setup/data/kafka/topics.txt | 5 - .../resources/spec-setup/data/nu/examples.txt | 3 - .../data/schema-registry/active-schemas.txt | 5 - .../test/resources/spec-setup/entrypoint.sh | 10 - .../auto-executed/001-setup-schemas.sh | 58 ----- .../scripts/auto-executed/002-setup-topics.sh | 31 --- ...003-import-and-deploy-example-scenarios.sh | 34 --- .../auto-executed/004-send-kafka-messages.sh | 37 ---- .../scripts/utils/kafka/purge-topic.sh | 17 -- .../scripts/utils/kafka/read-from-topic.sh | 12 -- .../scripts/utils/kafka/send-to-topic.sh | 13 -- ...loy-scenario-and-wait-for-running-state.sh | 96 --------- .../utils/nu/load-scenario-from-json-file.sh | 150 ------------- .../utils/nu/load-scenario-from-json.sh | 17 -- .../spec-setup/spec-setup.override.yml | 9 - .../pl/touk/nussknacker/BaseE2eSpec.scala | 36 ++++ .../nussknacker/BatchDataGenerationSpec.scala | 2 +- .../DetectLargeTransactionSpec.scala | 14 +- .../DetermineOfferedPlanSpec.scala | 14 +- ...asedInstallationExampleNuEnvironment.scala | 92 -------- examples/installation/.env | 1 - .../src/universal/conf/logback.xml | 1 + .../resources/bootstrap-setup.override.yml | 24 +++ .../src/main/resources/logback-test.xml | 2 +- .../pl/touk/nussknacker/test/MiscUtils.scala | 21 ++ .../test/containers/ContainerExt.scala | 4 +- ...asedNuInstallationExampleEnvironment.scala | 90 ++++++++ 55 files changed, 677 insertions(+), 662 deletions(-) create mode 100644 .github/workflows/benchmark-e2e.yml rename .github/workflows/{benchmark.yml => benchmark-jmh.yml} (81%) create mode 100644 benchmarks/src/test/resources/benchmark-setup.override.yml create mode 100644 benchmarks/src/test/resources/e2e/benchmark-flink/DetectLargeTransactions.json create mode 100644 benchmarks/src/test/resources/e2e/benchmark-flink/setup/kafka/topics.txt create mode 100644 benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/alerts.schema.json create mode 100644 benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/processedEvents.schema.json create mode 100644 benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/transactions.schema.json create mode 100644 benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/BaseE2EBenchmark.scala create mode 100644 benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/FlinkSteamingScenarioBenchmark.scala rename e2e-tests/src/test/resources/{spec-setup/batch-config => batch-data-generation}/batch-customizations.conf (100%) rename e2e-tests/src/test/resources/{spec-setup/batch-config => batch-data-generation}/tables-definition.sql (100%) rename e2e-tests/src/test/resources/{spec-setup/batch-config => batch-data-generation}/transactions/transactions.ndjson (100%) create mode 100644 e2e-tests/src/test/resources/batch-nu-designer.override.yml create mode 100644 e2e-tests/src/test/resources/bootstrap-setup-scenarios.override.yml rename e2e-tests/src/test/resources/{spec-setup => }/debuggable-nu-designer.override.yml (100%) rename e2e-tests/src/test/resources/{spec-setup/data/nu/scenarios => detect-large-transactions}/DetectLargeTransactions.json (100%) create mode 100644 e2e-tests/src/test/resources/detect-large-transactions/setup/kafka/topics.txt rename e2e-tests/src/test/resources/{spec-setup/data/schema-registry/schemas => detect-large-transactions/setup/schema-registry}/ProcessedTransactions.schema.json (100%) rename e2e-tests/src/test/resources/{spec-setup/data/schema-registry/schemas => detect-large-transactions/setup/schema-registry}/Transactions.schema.json (100%) rename e2e-tests/src/test/resources/{spec-setup/data/nu/scenarios => determine-offered-plan}/DetermineOfferedPlan.json (100%) create mode 100644 e2e-tests/src/test/resources/determine-offered-plan/setup/kafka/topics.txt rename e2e-tests/src/test/resources/{spec-setup/data/schema-registry/schemas => determine-offered-plan/setup/schema-registry}/Customers.schema.json (100%) rename e2e-tests/src/test/resources/{spec-setup/data/schema-registry/schemas => determine-offered-plan/setup/schema-registry}/SmsesWithOffer.schema.json (100%) delete mode 100644 e2e-tests/src/test/resources/spec-setup/Dockerfile delete mode 100644 e2e-tests/src/test/resources/spec-setup/batch-nu-designer.override.yml delete mode 100644 e2e-tests/src/test/resources/spec-setup/data/kafka/messages/transactions.txt delete mode 100644 e2e-tests/src/test/resources/spec-setup/data/kafka/topics.txt delete mode 100644 e2e-tests/src/test/resources/spec-setup/data/nu/examples.txt delete mode 100644 e2e-tests/src/test/resources/spec-setup/data/schema-registry/active-schemas.txt delete mode 100755 e2e-tests/src/test/resources/spec-setup/entrypoint.sh delete mode 100755 e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/001-setup-schemas.sh delete mode 100755 e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/002-setup-topics.sh delete mode 100755 e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/003-import-and-deploy-example-scenarios.sh delete mode 100755 e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/004-send-kafka-messages.sh delete mode 100755 e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/purge-topic.sh delete mode 100755 e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/read-from-topic.sh delete mode 100755 e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/send-to-topic.sh delete mode 100755 e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/deploy-scenario-and-wait-for-running-state.sh delete mode 100755 e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json-file.sh delete mode 100755 e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json.sh delete mode 100644 e2e-tests/src/test/resources/spec-setup/spec-setup.override.yml create mode 100644 e2e-tests/src/test/scala/pl/touk/nussknacker/BaseE2eSpec.scala delete mode 100644 e2e-tests/src/test/scala/pl/touk/nussknacker/DockerBasedInstallationExampleNuEnvironment.scala delete mode 100644 examples/installation/.env create mode 100644 utils/test-utils/src/main/resources/bootstrap-setup.override.yml create mode 100644 utils/test-utils/src/main/scala/pl/touk/nussknacker/test/MiscUtils.scala create mode 100644 utils/test-utils/src/main/scala/pl/touk/nussknacker/test/installationexample/DockerBasedNuInstallationExampleEnvironment.scala diff --git a/.github/workflows/benchmark-e2e.yml b/.github/workflows/benchmark-e2e.yml new file mode 100644 index 00000000000..fec68c96a68 --- /dev/null +++ b/.github/workflows/benchmark-e2e.yml @@ -0,0 +1,24 @@ +name: Benchmarks E2E +on: + schedule: + - cron: '0 23 * * *' + workflow_dispatch: +jobs: + benchmarks: + name: Benchmarks E2E + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Setup Scala + uses: olafurpg/setup-scala@v10 + with: + java-version: "openjdk@1.11" + - name: Run benchmarks + run: | + bash -c "export NUSSKNACKER_SCALA_VERSION=2.12 && sbt \"benchmarks/test:runMain pl.touk.nussknacker.engine.benchmarks.e2e.FlinkSteamingScenarioBenchmark 10000000\"" + - name: Store benchmark results + uses: actions/upload-artifact@v2 + with: + name: benchmark-e2e.csv + path: tmp/benchmarkResult.csv diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark-jmh.yml similarity index 81% rename from .github/workflows/benchmark.yml rename to .github/workflows/benchmark-jmh.yml index 52361b23fdc..feb5ef1aefb 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark-jmh.yml @@ -1,10 +1,11 @@ -name: Benchmarks +name: Benchmarks JMH on: schedule: - cron: '0 23 * * *' + workflow_dispatch: jobs: benchmarks: - name: Benchmarks + name: Benchmarks JMH runs-on: ubuntu-latest steps: - name: Checkout @@ -19,4 +20,4 @@ jobs: uses: actions/upload-artifact@v2 with: name: jmh-result.csv - path: benchmarks/jmh-result.csv \ No newline at end of file + path: benchmarks/jmh-result.csv diff --git a/benchmarks/src/test/resources/benchmark-setup.override.yml b/benchmarks/src/test/resources/benchmark-setup.override.yml new file mode 100644 index 00000000000..8c199820de3 --- /dev/null +++ b/benchmarks/src/test/resources/benchmark-setup.override.yml @@ -0,0 +1,17 @@ +services: + + bootstrap-setup: + environment: + BENCHMARK_FLINK_DEPLOY: false + volumes: + - ../../benchmarks/src/test/resources/e2e/benchmark-flink:/scenario-examples/benchmark-flink + + designer: + environment: + KAFKA_AUTO_OFFSET_RESET: "earliest" + + kafka: + deploy: + resources: + limits: + memory: 2048M diff --git a/benchmarks/src/test/resources/e2e/benchmark-flink/DetectLargeTransactions.json b/benchmarks/src/test/resources/e2e/benchmark-flink/DetectLargeTransactions.json new file mode 100644 index 00000000000..ecf8cc3c378 --- /dev/null +++ b/benchmarks/src/test/resources/e2e/benchmark-flink/DetectLargeTransactions.json @@ -0,0 +1,202 @@ +{ + "metaData" : { + "id" : "DetectLargeTransactions", + "additionalFields" : { + "description" : null, + "properties" : { + "parallelism" : "2" + }, + "metaDataType": "StreamMetaData" + } + }, + "nodes" : [ + { + "id" : "transactions", + "ref" : { + "typ" : "kafka", + "parameters" : [ + { + "name" : "Topic", + "expression" : { + "language" : "spel", + "expression" : "'transactions'" + } + }, + { + "name" : "Schema version", + "expression" : { + "language" : "spel", + "expression" : "'latest'" + } + } + ] + }, + "additionalFields" : { + "description" : null, + "layoutData" : { + "x" : 180, + "y" : 0 + } + }, + "type" : "Source" + }, + { + "nextFalse" : [ + { + "nextFalse" : [ + ], + "id" : "only large ones", + "expression" : { + "language" : "spel", + "expression" : "#input.amount > 30" + }, + "isDisabled" : null, + "additionalFields" : { + "description" : null, + "layoutData" : { + "x" : 0, + "y" : 360 + } + }, + "type" : "Filter" + }, + { + "id" : "send for audit", + "ref" : { + "typ" : "kafka", + "parameters" : [ + { + "name" : "Topic", + "expression" : { + "language" : "spel", + "expression" : "'processedEvents'" + } + }, + { + "name" : "Schema version", + "expression" : { + "language" : "spel", + "expression" : "'latest'" + } + }, + { + "name" : "Key", + "expression" : { + "language" : "spel", + "expression" : "" + } + }, + { + "name" : "Raw editor", + "expression" : { + "language" : "spel", + "expression" : "true" + } + }, + { + "name" : "Value validation mode", + "expression" : { + "language" : "spel", + "expression" : "'strict'" + } + }, + { + "name" : "Value", + "expression" : { + "language" : "spel", + "expression" : "#input" + } + } + ] + }, + "endResult" : null, + "isDisabled" : null, + "additionalFields" : { + "description" : null, + "layoutData" : { + "x" : 0, + "y" : 540 + } + }, + "type" : "Sink" + } + ], + "id" : "last packet", + "expression" : { + "language" : "spel", + "expression" : "#input.isLast" + }, + "isDisabled" : null, + "additionalFields" : { + "description" : null, + "layoutData" : { + "x" : 180, + "y" : 180 + } + }, + "type" : "Filter" + }, + { + "id" : "send to alerts", + "ref" : { + "typ" : "kafka", + "parameters" : [ + { + "name" : "Topic", + "expression" : { + "language" : "spel", + "expression" : "'alerts'" + } + }, + { + "name" : "Schema version", + "expression" : { + "language" : "spel", + "expression" : "'latest'" + } + }, + { + "name" : "Key", + "expression" : { + "language" : "spel", + "expression" : "" + } + }, + { + "name" : "Raw editor", + "expression" : { + "language" : "spel", + "expression" : "false" + } + }, + { + "name" : "message", + "expression" : { + "language" : "spel", + "expression" : "\"Last request\"" + } + }, + { + "name" : "eventDate", + "expression" : { + "language" : "spel", + "expression" : "#DATE.now.toEpochMilli" + } + } + ] + }, + "endResult" : null, + "isDisabled" : null, + "additionalFields" : { + "description" : null, + "layoutData" : { + "x" : 360, + "y" : 360 + } + }, + "type" : "Sink" + } + ], + "additionalBranches" : [ + ] +} diff --git a/benchmarks/src/test/resources/e2e/benchmark-flink/setup/kafka/topics.txt b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/kafka/topics.txt new file mode 100644 index 00000000000..dd723cab3d6 --- /dev/null +++ b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/kafka/topics.txt @@ -0,0 +1,3 @@ +transactions +processedEvents +alerts diff --git a/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/alerts.schema.json b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/alerts.schema.json new file mode 100644 index 00000000000..30fb0f3043c --- /dev/null +++ b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/alerts.schema.json @@ -0,0 +1,10 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema", + "type": "object", + "properties": { + "message": { "type": "string" }, + "eventDate": { "type": "integer" } + }, + "required": ["message"], + "additionalProperties": false +} diff --git a/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/processedEvents.schema.json b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/processedEvents.schema.json new file mode 100644 index 00000000000..cfcf28f0a03 --- /dev/null +++ b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/processedEvents.schema.json @@ -0,0 +1,11 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema", + "type": "object", + "properties": { + "clientId": { "type": "string" }, + "amount": { "type": "integer" }, + "isLast": { "type": "boolean", "default": false } + }, + "required": ["clientId", "amount"], + "additionalProperties": false +} diff --git a/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/transactions.schema.json b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/transactions.schema.json new file mode 100644 index 00000000000..cfcf28f0a03 --- /dev/null +++ b/benchmarks/src/test/resources/e2e/benchmark-flink/setup/schema-registry/transactions.schema.json @@ -0,0 +1,11 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema", + "type": "object", + "properties": { + "clientId": { "type": "string" }, + "amount": { "type": "integer" }, + "isLast": { "type": "boolean", "default": false } + }, + "required": ["clientId", "amount"], + "additionalProperties": false +} diff --git a/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/BaseE2EBenchmark.scala b/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/BaseE2EBenchmark.scala new file mode 100644 index 00000000000..1b5f9c1003f --- /dev/null +++ b/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/BaseE2EBenchmark.scala @@ -0,0 +1,31 @@ +package pl.touk.nussknacker.engine.benchmarks.e2e + +import com.typesafe.scalalogging.LazyLogging +import pl.touk.nussknacker.engine.version.BuildInfo +import pl.touk.nussknacker.test.installationexample.{ + DockerBasedInstallationExampleClient, + DockerBasedInstallationExampleNuEnvironment +} +import pl.touk.nussknacker.test.MiscUtils._ + +// Before running benchmarks in this module, a fresh docker image should be built from sources and placed in the local +// registry. If you run tests based on this trait in Intellij Idea and the images is not built, you can do it manually: +// `bash -c "export NUSSKNACKER_SCALA_VERSION=2.12 && sbt dist/Docker/publishLocal"` +trait BaseE2EBenchmark { + + val client: DockerBasedInstallationExampleClient = + BaseE2EBenchmark.dockerBasedInstallationExampleNuEnvironmentSingleton.client + +} + +object BaseE2EBenchmark extends LazyLogging { + + val dockerBasedInstallationExampleNuEnvironmentSingleton = + new DockerBasedInstallationExampleNuEnvironment( + nussknackerImageVersion = BuildInfo.version, + dockerComposeTweakFiles = List( + BaseE2EBenchmark.getClass.getResourceAsStream("/benchmark-setup.override.yml").toFile + ) + ) + +} diff --git a/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/FlinkSteamingScenarioBenchmark.scala b/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/FlinkSteamingScenarioBenchmark.scala new file mode 100644 index 00000000000..b8836e0c2e4 --- /dev/null +++ b/benchmarks/src/test/scala/pl/touk/nussknacker/engine/benchmarks/e2e/FlinkSteamingScenarioBenchmark.scala @@ -0,0 +1,115 @@ +package pl.touk.nussknacker.engine.benchmarks.e2e + +import better.files.File.root +import cats.effect.{ExitCode, IO, IOApp} +import com.typesafe.scalalogging.LazyLogging + +import java.time.{Duration, Instant} +import scala.concurrent.duration._ +import scala.util.{Random, Try} + +object FlinkSteamingScenarioBenchmark extends IOApp with BaseE2EBenchmark with LazyLogging { + + override def run(args: List[String]): IO[ExitCode] = { + for { + messagesCount <- readBenchmarkMessagesCount(args) + _ <- log("Starting message generation...") + prepareDataDuration <- measure(generateBenchmarkMessages(messagesCount)) + _ <- log("Generation finished!") + _ <- log("Starting benchmark scenario...") + runScenarioDuration <- measure(runScenarioAndStartProcessing()) + _ <- log("Scenario started. Messages are processing...") + verifyResultDuration <- measure(waitForProcessingFinish()) + _ <- log(s"All messages processed.") + _ <- saveMeasurementsToFile(prepareDataDuration, runScenarioDuration, verifyResultDuration) + _ <- log(s"Results:") + _ <- log(s"- Preparing data: ${prepareDataDuration.duration.toSeconds} seconds") + _ <- log(s"- Running scenario: ${runScenarioDuration.duration.toSeconds} seconds") + _ <- log(s"- Verifying result: ${verifyResultDuration.duration.toSeconds} seconds") + } yield ExitCode.Success + } + + private def readBenchmarkMessagesCount(args: List[String]) = IO.delay { + args.headOption match { + case Some(arg) => + Try(arg.toInt) + .getOrElse(throw new IllegalArgumentException("Invalid number format. Please provide a valid integer.")) + case None => + throw new IllegalArgumentException("No arguments provided. Please provide an integer argument.") + } + } + + private def generateBenchmarkMessages(messagesCount: Int) = IO.delay { + val batch = 2000 + val noOfBatches = Math.ceil(messagesCount / batch).toInt + (1 to noOfBatches) + .foreach { batchId => + if (batchId % 10 == 0) logger.info(s"Generated ${"%.2f".format(batchId * 100.0 / noOfBatches)}% of messages...") + sendTestMessages(batchId, batch) + } + client.sendMessageToKafka("transactions", generateMessage(s"${messagesCount + 1}", last = true)) + } + + private def runScenarioAndStartProcessing() = IO.delay { + client.deployAndWaitForRunningState("DetectLargeTransactions") + } + + private def waitForProcessingFinish(): IO[Unit] = { + def foundLastRequest(): IO[Boolean] = IO.delay { + client + .readAllMessagesFromKafka("alerts") + .exists(json => json.obj.get("message").exists(_.str == "Last request")) + } + + foundLastRequest() + .flatMap { + case false => + IO + .sleep(100 millis) + .flatMap(_ => waitForProcessingFinish()) + case true => + IO.pure(()) + } + } + + private def saveMeasurementsToFile( + prepareData: Measured[_], + runScenario: Measured[_], + verifyResult: Measured[_] + ): IO[Unit] = IO.delay { + def format(measured: Measured[_]) = "%.2f".format(measured.duration.toMillis.toDouble) + + (root / "tmp" / "benchmarkResult.csv") + .createFileIfNotExists() + .clear() + .appendLine(s"prepareData,${format(prepareData)}") + .appendLine(s"runScenario,${format(runScenario)}") + .appendLine(s"verifyResult,${format(verifyResult)}") + .appendLine() + } + + private def measure[T](action: IO[T]): IO[Measured[T]] = { + for { + started <- IO.delay(Instant.now()) + result <- action + duration <- IO.delay(Duration.between(started, Instant.now())) + } yield Measured(result, duration) + } + + private def sendTestMessages(batchId: Int, batchSize: Int): Unit = { + client.sendMessagesToKafka( + "transactions", + (0 until batchSize).map { id => generateMessage(s"${batchId + id}", last = false) } + ) + } + + private def generateMessage(id: String, last: Boolean) = ujson.Obj( + "amount" -> (Random.nextInt(1000) + 1), + "clientId" -> s"$id", + "isLast" -> last + ) + + private def log(message: => String) = IO.delay(logger.info(message)) +} + +final case class Measured[T](result: T, duration: Duration) diff --git a/build.sbt b/build.sbt index ca61525287b..b5a89adfb63 100644 --- a/build.sbt +++ b/build.sbt @@ -834,8 +834,9 @@ lazy val benchmarks = (project in file("benchmarks")) name := "nussknacker-benchmarks", libraryDependencies ++= { Seq( - "org.apache.flink" % "flink-streaming-java" % flinkV exclude ("com.esotericsoftware", "kryo-shaded"), - "org.apache.flink" % "flink-runtime" % flinkV + "org.apache.flink" % "flink-streaming-java" % flinkV exclude ("com.esotericsoftware", "kryo-shaded"), + "org.apache.flink" % "flink-runtime" % flinkV, + "com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV % Test, ) }, Jmh / run / javaOptions ++= ( @@ -851,6 +852,13 @@ lazy val benchmarks = (project in file("benchmarks")) Jmh / dependencyClasspath := (Test / dependencyClasspath).value, Jmh / generateJmhSourcesAndResources := (Jmh / generateJmhSourcesAndResources).dependsOn(Test / compile).value, ) + .settings { + // TODO: it'd be better to use scalaVersion here, but for some reason it's hard to disable existing task dynamically + forScalaVersion(defaultScalaV) { + case (2, 12) => doExecuteMainFromTestSources + case (2, 13) => executeMainFromTestSourcesNotSupported + } + } .dependsOn( designer, extensionsApi, @@ -862,6 +870,20 @@ lazy val benchmarks = (project in file("benchmarks")) testUtils % Test ) +lazy val doExecuteMainFromTestSources = Seq( + (Test / runMain) := (Test / runMain) + .dependsOn(distribution / Docker / publishLocal) + .evaluated +) + +lazy val executeMainFromTestSourcesNotSupported = Seq( + (Test / runMain) := { + streams.value.log.info( + "E2E benchmarks are skipped for Scala 2.13 because Nu installation example is currently based on Scala 2.12" + ) + } +) + lazy val kafkaUtils = (project in utils("kafka-utils")) .settings(commonSettings) .settings( @@ -1109,24 +1131,26 @@ lazy val testUtils = (project in utils("test-utils")) name := "nussknacker-test-utils", libraryDependencies ++= { Seq( - "org.scalatest" %% "scalatest" % scalaTestV, - "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV, - "com.typesafe" % "config" % configV, - "org.typelevel" %% "cats-core" % catsV, - "ch.qos.logback" % "logback-classic" % logbackV, - "org.springframework" % "spring-jcl" % springV, - "commons-io" % "commons-io" % flinkCommonsIOV, - "org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV, - "com.softwaremill.sttp.client3" %% "slf4j-backend" % sttpV, - "org.typelevel" %% "cats-effect" % catsEffectV, - "io.circe" %% "circe-parser" % circeV, - "org.testcontainers" % "testcontainers" % testContainersJavaV, - "com.lihaoyi" %% "ujson" % ujsonV, + "com.github.pathikrit" %% "better-files" % betterFilesV, + "org.scalatest" %% "scalatest" % scalaTestV, + "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingV, + "com.typesafe" % "config" % configV, + "org.typelevel" %% "cats-core" % catsV, + "ch.qos.logback" % "logback-classic" % logbackV, + "org.springframework" % "spring-jcl" % springV, + "commons-io" % "commons-io" % flinkCommonsIOV, + "org.scala-lang.modules" %% "scala-collection-compat" % scalaCollectionsCompatV, + "com.softwaremill.sttp.client3" %% "slf4j-backend" % sttpV, + "org.typelevel" %% "cats-effect" % catsEffectV, + "io.circe" %% "circe-parser" % circeV, + "org.testcontainers" % "testcontainers" % testContainersJavaV, + "com.dimafeng" %% "testcontainers-scala-core" % testContainersScalaV, + "com.lihaoyi" %% "ujson" % ujsonV, // This lib produces more descriptive errors during validation than everit - "com.networknt" % "json-schema-validator" % "1.4.0", - "com.softwaremill.sttp.tapir" %% "tapir-core" % tapirV, - "com.softwaremill.sttp.tapir" %% "tapir-apispec-docs" % tapirV, - "com.softwaremill.sttp.apispec" %% "openapi-circe-yaml" % openapiCirceYamlV, + "com.networknt" % "json-schema-validator" % "1.4.0", + "com.softwaremill.sttp.tapir" %% "tapir-core" % tapirV, + "com.softwaremill.sttp.tapir" %% "tapir-apispec-docs" % tapirV, + "com.softwaremill.sttp.apispec" %% "openapi-circe-yaml" % openapiCirceYamlV, ) ++ restAssuredDependency(scalaVersion.value) } ) diff --git a/e2e-tests/src/test/resources/spec-setup/batch-config/batch-customizations.conf b/e2e-tests/src/test/resources/batch-data-generation/batch-customizations.conf similarity index 100% rename from e2e-tests/src/test/resources/spec-setup/batch-config/batch-customizations.conf rename to e2e-tests/src/test/resources/batch-data-generation/batch-customizations.conf diff --git a/e2e-tests/src/test/resources/spec-setup/batch-config/tables-definition.sql b/e2e-tests/src/test/resources/batch-data-generation/tables-definition.sql similarity index 100% rename from e2e-tests/src/test/resources/spec-setup/batch-config/tables-definition.sql rename to e2e-tests/src/test/resources/batch-data-generation/tables-definition.sql diff --git a/e2e-tests/src/test/resources/spec-setup/batch-config/transactions/transactions.ndjson b/e2e-tests/src/test/resources/batch-data-generation/transactions/transactions.ndjson similarity index 100% rename from e2e-tests/src/test/resources/spec-setup/batch-config/transactions/transactions.ndjson rename to e2e-tests/src/test/resources/batch-data-generation/transactions/transactions.ndjson diff --git a/e2e-tests/src/test/resources/batch-nu-designer.override.yml b/e2e-tests/src/test/resources/batch-nu-designer.override.yml new file mode 100644 index 00000000000..4819e8a33f9 --- /dev/null +++ b/e2e-tests/src/test/resources/batch-nu-designer.override.yml @@ -0,0 +1,10 @@ +services: + + designer: + environment: + CONFIG_FILE: "/opt/nussknacker/conf/application.conf,/opt/nussknacker/conf/application-customizations.conf,/opt/nussknacker/conf/batch-customizations.conf" + TABLES_DEFINITION_FILE: "/opt/nussknacker/conf/tables-definition.sql" + volumes: + - ../../e2e-tests/src/test/resources/batch-data-generation/batch-customizations.conf:/opt/nussknacker/conf/batch-customizations.conf + - ../../e2e-tests/src/test/resources/batch-data-generation/transactions:/transactions + - ../../e2e-tests/src/test/resources/batch-data-generation/tables-definition.sql:/opt/nussknacker/conf/tables-definition.sql diff --git a/e2e-tests/src/test/resources/bootstrap-setup-scenarios.override.yml b/e2e-tests/src/test/resources/bootstrap-setup-scenarios.override.yml new file mode 100644 index 00000000000..6e1c1f81ab0 --- /dev/null +++ b/e2e-tests/src/test/resources/bootstrap-setup-scenarios.override.yml @@ -0,0 +1,6 @@ +services: + + bootstrap-setup: + volumes: + - ../../e2e-tests/src/test/resources/detect-large-transactions:/scenario-examples/detect-large-transactions + - ../../e2e-tests/src/test/resources/determine-offered-plan:/scenario-examples/determine-offered-plan diff --git a/e2e-tests/src/test/resources/spec-setup/debuggable-nu-designer.override.yml b/e2e-tests/src/test/resources/debuggable-nu-designer.override.yml similarity index 100% rename from e2e-tests/src/test/resources/spec-setup/debuggable-nu-designer.override.yml rename to e2e-tests/src/test/resources/debuggable-nu-designer.override.yml diff --git a/e2e-tests/src/test/resources/spec-setup/data/nu/scenarios/DetectLargeTransactions.json b/e2e-tests/src/test/resources/detect-large-transactions/DetectLargeTransactions.json similarity index 100% rename from e2e-tests/src/test/resources/spec-setup/data/nu/scenarios/DetectLargeTransactions.json rename to e2e-tests/src/test/resources/detect-large-transactions/DetectLargeTransactions.json diff --git a/e2e-tests/src/test/resources/detect-large-transactions/setup/kafka/topics.txt b/e2e-tests/src/test/resources/detect-large-transactions/setup/kafka/topics.txt new file mode 100644 index 00000000000..8efc1cf5ea0 --- /dev/null +++ b/e2e-tests/src/test/resources/detect-large-transactions/setup/kafka/topics.txt @@ -0,0 +1,2 @@ +ProcessedTransactions +Transactions diff --git a/e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/ProcessedTransactions.schema.json b/e2e-tests/src/test/resources/detect-large-transactions/setup/schema-registry/ProcessedTransactions.schema.json similarity index 100% rename from e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/ProcessedTransactions.schema.json rename to e2e-tests/src/test/resources/detect-large-transactions/setup/schema-registry/ProcessedTransactions.schema.json diff --git a/e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/Transactions.schema.json b/e2e-tests/src/test/resources/detect-large-transactions/setup/schema-registry/Transactions.schema.json similarity index 100% rename from e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/Transactions.schema.json rename to e2e-tests/src/test/resources/detect-large-transactions/setup/schema-registry/Transactions.schema.json diff --git a/e2e-tests/src/test/resources/spec-setup/data/nu/scenarios/DetermineOfferedPlan.json b/e2e-tests/src/test/resources/determine-offered-plan/DetermineOfferedPlan.json similarity index 100% rename from e2e-tests/src/test/resources/spec-setup/data/nu/scenarios/DetermineOfferedPlan.json rename to e2e-tests/src/test/resources/determine-offered-plan/DetermineOfferedPlan.json diff --git a/e2e-tests/src/test/resources/determine-offered-plan/setup/kafka/topics.txt b/e2e-tests/src/test/resources/determine-offered-plan/setup/kafka/topics.txt new file mode 100644 index 00000000000..2fe924ad72f --- /dev/null +++ b/e2e-tests/src/test/resources/determine-offered-plan/setup/kafka/topics.txt @@ -0,0 +1,2 @@ +Customers +SmsesWithOffer diff --git a/e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/Customers.schema.json b/e2e-tests/src/test/resources/determine-offered-plan/setup/schema-registry/Customers.schema.json similarity index 100% rename from e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/Customers.schema.json rename to e2e-tests/src/test/resources/determine-offered-plan/setup/schema-registry/Customers.schema.json diff --git a/e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/SmsesWithOffer.schema.json b/e2e-tests/src/test/resources/determine-offered-plan/setup/schema-registry/SmsesWithOffer.schema.json similarity index 100% rename from e2e-tests/src/test/resources/spec-setup/data/schema-registry/schemas/SmsesWithOffer.schema.json rename to e2e-tests/src/test/resources/determine-offered-plan/setup/schema-registry/SmsesWithOffer.schema.json diff --git a/e2e-tests/src/test/resources/spec-setup/Dockerfile b/e2e-tests/src/test/resources/spec-setup/Dockerfile deleted file mode 100644 index dbdb4a32f3b..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -FROM bitnami/kafka:3.7.0 - -USER root - -RUN apt update && \ - apt install -y curl jq uuid-runtime kafkacat && \ - rm -rf /var/lib/apt/lists/* - -WORKDIR /app - -COPY entrypoint.sh / -COPY scripts/ /app/scripts/ -COPY data/ /app/data/ - -ENTRYPOINT [ "/entrypoint.sh" ] diff --git a/e2e-tests/src/test/resources/spec-setup/batch-nu-designer.override.yml b/e2e-tests/src/test/resources/spec-setup/batch-nu-designer.override.yml deleted file mode 100644 index 16deafd9cad..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/batch-nu-designer.override.yml +++ /dev/null @@ -1,10 +0,0 @@ -services: - - designer: - environment: - CONFIG_FILE: "/opt/nussknacker/conf/application.conf,/opt/nussknacker/conf/application-customizations.conf,/opt/nussknacker/conf/batch-customizations.conf" - TABLES_DEFINITION_FILE: "/opt/nussknacker/conf/tables-definition.sql" - volumes: - - ../../e2e-tests/src/test/resources/spec-setup/batch-config/batch-customizations.conf:/opt/nussknacker/conf/batch-customizations.conf - - ../../e2e-tests/src/test/resources/spec-setup/batch-config/transactions:/transactions - - ../../e2e-tests/src/test/resources/spec-setup/batch-config/tables-definition.sql:/opt/nussknacker/conf/tables-definition.sql diff --git a/e2e-tests/src/test/resources/spec-setup/data/kafka/messages/transactions.txt b/e2e-tests/src/test/resources/spec-setup/data/kafka/messages/transactions.txt deleted file mode 100644 index 91adff0941d..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/data/kafka/messages/transactions.txt +++ /dev/null @@ -1,3 +0,0 @@ -# Example messages below (message per line) -# { "one": 1, "two": 2 } -# { "one": 1, "three": 3 } diff --git a/e2e-tests/src/test/resources/spec-setup/data/kafka/topics.txt b/e2e-tests/src/test/resources/spec-setup/data/kafka/topics.txt deleted file mode 100644 index 803c44669cf..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/data/kafka/topics.txt +++ /dev/null @@ -1,5 +0,0 @@ -# Example topic names below (topic name per line) -Customers -ProcessedTransactions -SmsesWithOffer -Transactions diff --git a/e2e-tests/src/test/resources/spec-setup/data/nu/examples.txt b/e2e-tests/src/test/resources/spec-setup/data/nu/examples.txt deleted file mode 100644 index 2e592de900c..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/data/nu/examples.txt +++ /dev/null @@ -1,3 +0,0 @@ -# Scenarios to put as examples (file with scenario from the scenarios folder per line) -DetectLargeTransactions.json -DetermineOfferedPlan.json diff --git a/e2e-tests/src/test/resources/spec-setup/data/schema-registry/active-schemas.txt b/e2e-tests/src/test/resources/spec-setup/data/schema-registry/active-schemas.txt deleted file mode 100644 index ac31c52c373..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/data/schema-registry/active-schemas.txt +++ /dev/null @@ -1,5 +0,0 @@ -# JSON schemas to load (JSON schema file name from the schemas folder per line) -Customers.schema.json -ProcessedTransactions.schema.json -SmsesWithOffer.schema.json -Transactions.schema.json diff --git a/e2e-tests/src/test/resources/spec-setup/entrypoint.sh b/e2e-tests/src/test/resources/spec-setup/entrypoint.sh deleted file mode 100755 index c0127828bf7..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/entrypoint.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/bash -e - -# setup other containers using auto-executed scripts -while IFS= read -r script; do - "$script" -done < <(find /app/scripts/auto-executed -type f -name '*.sh' | sort) - -echo "Setup done!" -# loop forever (you can use manually called utils scripts now) -tail -f /dev/null diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/001-setup-schemas.sh b/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/001-setup-schemas.sh deleted file mode 100755 index 1ced04b8679..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/001-setup-schemas.sh +++ /dev/null @@ -1,58 +0,0 @@ -#!/bin/bash -e - -cd "$(dirname "$0")" - -function createJsonSchema() { - if [ "$#" -ne 2 ]; then - echo "Error: Two parameters required: 1) schema name, 2) schema file path" - exit 11 - fi - - set -e - - local SCHEMA_NAME=$1 - local SCHEMA_FILE=$2 - - echo "Creating schema '$SCHEMA_NAME' ..." - ESCAPED_JSON_SCHEMA=$(awk 'BEGIN{ORS="\\n"} {gsub(/"/, "\\\"")} 1' < "$SCHEMA_FILE") - - local REQUEST_BODY="{ - \"schema\": \"$ESCAPED_JSON_SCHEMA\", - \"schemaType\": \"JSON\", - \"references\": [] - }" - - local RESPONSE - RESPONSE=$(curl -s -L -w "\n%{http_code}" -u admin:admin \ - -X POST "http://schema-registry:8081/subjects/${SCHEMA_NAME}/versions" \ - -H "Content-Type: application/vnd.schemaregistry.v1+json" -d "$REQUEST_BODY" - ) - - local HTTP_STATUS - HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1) - - if [[ "$HTTP_STATUS" != 200 ]] ; then - local RESPONSE_BODY - RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d) - echo -e "Error: Cannot create schema $SCHEMA_NAME.\nHTTP status: $HTTP_STATUS, response body: $RESPONSE_BODY" - exit 12 - fi - - echo "Schema '$SCHEMA_NAME' created!" -} - -echo "Starting to add preconfigured schemas ..." - -while IFS= read -r SCHEMA_FILENAME; do - - if [[ $SCHEMA_FILENAME == "#"* ]]; then - continue - fi - - SCHEMA_NAME="$(basename "$SCHEMA_FILENAME" ".schema.json")-value" - createJsonSchema "$SCHEMA_NAME" "$(realpath ../../data/schema-registry/schemas/"$SCHEMA_FILENAME")" - -done < "../../data/schema-registry/active-schemas.txt" - - -echo "DONE!" diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/002-setup-topics.sh b/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/002-setup-topics.sh deleted file mode 100755 index dc44807d46d..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/002-setup-topics.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/bash -e - -cd "$(dirname "$0")" - -function createTopic() { - if [ "$#" -ne 1 ]; then - echo "Error: One parameter required: 1) topic name" - exit 11 - fi - - set -e - local TOPIC_NAME=$1 - - echo "Creating topic '$TOPIC_NAME'" - /opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka:9092 --topic "$TOPIC_NAME" - echo "Creating topic '$TOPIC_NAME'" -} - -echo "Starting to create preconfigured topics ..." - -while IFS= read -r TOPIC_NAME; do - - if [[ $TOPIC_NAME == "#"* ]]; then - continue - fi - - createTopic "$TOPIC_NAME" - -done < "../../data/kafka/topics.txt" - -echo "DONE!" diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/003-import-and-deploy-example-scenarios.sh b/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/003-import-and-deploy-example-scenarios.sh deleted file mode 100755 index a583cc18ff5..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/003-import-and-deploy-example-scenarios.sh +++ /dev/null @@ -1,34 +0,0 @@ -#!/bin/bash -e - -cd "$(dirname "$0")" - -function importAndDeployScenario() { - if [ "$#" -ne 2 ]; then - echo "Error: Two parameters required: 1) scenario name, 2) example scenario file path" - exit 11 - fi - - set -e - - local EXAMPLE_SCENARIO_NAME=$1 - local EXAMPLE_SCENARIO_FILE=$2 - - ../utils/nu/load-scenario-from-json-file.sh "$EXAMPLE_SCENARIO_NAME" "$EXAMPLE_SCENARIO_FILE" - ../utils/nu/deploy-scenario-and-wait-for-running-state.sh "$EXAMPLE_SCENARIO_NAME" -} - -echo "Starting to import and deploy example scenarios ..." - -while IFS= read -r EXAMPLE_SCENARIO_FILENAME; do - - if [[ $EXAMPLE_SCENARIO_FILENAME == "#"* ]]; then - continue - fi - - EXAMPLE_SCENARIO_NAME=$(basename "$EXAMPLE_SCENARIO_FILENAME" ".json") - - importAndDeployScenario "$EXAMPLE_SCENARIO_NAME" "$(realpath ../../data/nu/scenarios/"$EXAMPLE_SCENARIO_FILENAME")" - -done < "../../data/nu/examples.txt" - -echo "DONE!" diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/004-send-kafka-messages.sh b/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/004-send-kafka-messages.sh deleted file mode 100755 index cb22d3cf61e..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/scripts/auto-executed/004-send-kafka-messages.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/bash -e - -cd "$(dirname "$0")" - -function sendMessage() { - if [ "$#" -ne 2 ]; then - echo "Error: Two parameters required: 1) topic name, 2) message" - exit 11 - fi - - set -e - - local TOPIC_NAME=$1 - local MSG=$2 - - echo "Sending message $MSG to '$TOPIC_NAME'" - ../utils/kafka/send-to-topic.sh "$TOPIC_NAME" "$MSG" - echo "Message sent!" -} - -echo "Starting to send preconfigured messages ..." - -for FILE in "../../data/kafka/messages"/*; do - if [ -f "$FILE" ]; then - TOPIC_NAME=$(basename "$FILE") - - while IFS= read -r MSG; do - if [[ $MSG == "#"* ]]; then - continue - fi - - sendMessage "$TOPIC_NAME" "$MSG" - done < "$FILE" - fi -done - -echo "DONE!" diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/purge-topic.sh b/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/purge-topic.sh deleted file mode 100755 index c925330e46b..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/purge-topic.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash -e - -if [ "$#" -ne 1 ]; then - echo "One parameter required: 1) topic name" - exit 1 -fi - -cd "$(dirname "$0")" - -TOPIC_NAME=$1 -DELETE_TOPIC_ORDER_FILE="/tmp/delete-$TOPIC_NAME.json" - -trap 'rm "$DELETE_TOPIC_ORDER_FILE"' EXIT - -echo "{ \"partitions\": [{ \"topic\": \"$TOPIC_NAME\", \"partition\": 0, \"offset\": -1 }], \"version\": 1 }" > "$DELETE_TOPIC_ORDER_FILE" - -/opt/bitnami/kafka/bin/kafka-delete-records.sh --bootstrap-server kafka:9092 -offset-json-file "$DELETE_TOPIC_ORDER_FILE" diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/read-from-topic.sh b/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/read-from-topic.sh deleted file mode 100755 index 7501eb071d6..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/read-from-topic.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash -e - -if [ "$#" -ne 1 ]; then - echo "One parameter required: 1) topic name" - exit 1 -fi - -cd "$(dirname "$0")" - -TOPIC_NAME=$1 - -kcat -C -b kafka:9092 -t "$TOPIC_NAME" -o beginning -e -q diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/send-to-topic.sh b/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/send-to-topic.sh deleted file mode 100755 index 0d387168b38..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/scripts/utils/kafka/send-to-topic.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash -e - -if [ "$#" -ne 2 ]; then - echo "Two parameters required: 1) topic name, 2) message" - exit 1 -fi - -cd "$(dirname "$0")" - -TOPIC=$1 -MESSAGE=$2 - -echo "$MESSAGE" | /opt/bitnami/kafka/bin/kafka-console-producer.sh --topic "$TOPIC" --bootstrap-server kafka:9092 diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/deploy-scenario-and-wait-for-running-state.sh b/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/deploy-scenario-and-wait-for-running-state.sh deleted file mode 100755 index 17f3a67ce9b..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/deploy-scenario-and-wait-for-running-state.sh +++ /dev/null @@ -1,96 +0,0 @@ -#!/bin/bash -e - -cd "$(dirname "$0")" - -if [ "$#" -lt 1 ]; then - echo "Error: One parameter required: 1) scenario name" - exit 1 -fi - -SCENARIO_NAME=$1 -TIMEOUT_SECONDS=${2:-60} -WAIT_INTERVAL=5 - -function deployScenario() { - if [ "$#" -ne 1 ]; then - echo "Error: One parameter required: 1) scenario name" - exit 11 - fi - - set -e - - local SCENARIO_NAME=$1 - - local RESPONSE - RESPONSE=$(curl -s -L -w "\n%{http_code}" -u admin:admin \ - -X POST "http://nginx:8080/api/processManagement/deploy/$SCENARIO_NAME" - ) - - local HTTP_STATUS - HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1) - - if [ "$HTTP_STATUS" != "200" ]; then - local RESPONSE_BODY - RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d) - echo -e "Error: Cannot run scenario $SCENARIO_NAME deployment.\nHTTP status: $HTTP_STATUS, response body: $RESPONSE_BODY" - exit 12 - fi - - echo "Scenario $SCENARIO_NAME deployment started ..." -} - -function checkDeploymentStatus() { - if [ "$#" -ne 1 ]; then - echo "Error: One parameter required: 1) scenario name" - exit 21 - fi - - set -e - - local SCENARIO_NAME=$1 - - local RESPONSE - RESPONSE=$(curl -s -L -w "\n%{http_code}" -u admin:admin \ - -X GET "http://nginx:8080/api/processes/$SCENARIO_NAME/status" - ) - - local HTTP_STATUS - HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1) - local RESPONSE_BODY - RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d) - - if [ "$HTTP_STATUS" != "200" ]; then - echo -e "Error: Cannot check scenario $SCENARIO_NAME deployment status.\nHTTP status: $HTTP_STATUS, response body: $RESPONSE_BODY" - exit 22 - fi - - local SCENARIO_STATUS - SCENARIO_STATUS=$(echo "$RESPONSE_BODY" | jq -r '.status.name') - echo "$SCENARIO_STATUS" -} - -echo "Deploying scenario $SCENARIO_NAME ..." - -START_TIME=$(date +%s) -END_TIME=$((START_TIME + TIMEOUT_SECONDS)) - -deployScenario "$SCENARIO_NAME" - -while true; do - DEPLOYMENT_STATUS=$(checkDeploymentStatus "$SCENARIO_NAME") - - if [ "$DEPLOYMENT_STATUS" == "RUNNING" ]; then - break - fi - - CURRENT_TIME=$(date +%s) - if [ $CURRENT_TIME -gt $END_TIME ]; then - echo "Error: Timeout for waiting for the RUNNING state of $SCENARIO_NAME deployment reached!" - exit 2 - fi - - echo "$SCENARIO_NAME deployment state is $DEPLOYMENT_STATUS. Checking again in $WAIT_INTERVAL seconds..." - sleep $WAIT_INTERVAL -done - -echo "Scenario $SCENARIO_NAME is RUNNING!" diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json-file.sh b/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json-file.sh deleted file mode 100755 index 03bb3276777..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json-file.sh +++ /dev/null @@ -1,150 +0,0 @@ -#!/bin/bash -e - -cd "$(dirname "$0")" - -if [ "$#" -lt 2 ]; then - echo "Error: Two parameters required: 1) scenario name, 2) scenario file path" - exit 1 -fi - -SCENARIO_NAME=$1 -SCENARIO_FILE_PATH=$2 -CATEGORY=${3:-"Default"} - -if [ ! -f "$SCENARIO_FILE_PATH" ]; then - echo "Error: Cannot find file $SCENARIO_FILE_PATH with scenario" - exit 2 -fi - -function createEmptyScenario() { - if [ "$#" -ne 4 ]; then - echo "Error: Four parameters required: 1) scenario name, 2) processing mode, 3) category, 4) engine" - exit 11 - fi - - set -e - - local SCENARIO_NAME=$1 - local PROCESSING_MODE=$2 - local CATEGORY=$3 - local ENGINE=$4 - - local REQUEST_BODY="{ - \"name\": \"$SCENARIO_NAME\", - \"processingMode\": \"$PROCESSING_MODE\", - \"category\": \"$CATEGORY\", - \"engineSetupName\": \"$ENGINE\", - \"isFragment\": false - }" - - local RESPONSE - RESPONSE=$(curl -s -L -w "\n%{http_code}" -u admin:admin \ - -X POST "http://nginx:8080/api/processes" \ - -H "Content-Type: application/json" -d "$REQUEST_BODY" - ) - - local HTTP_STATUS - HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1) - - if [ "$HTTP_STATUS" != "201" ]; then - local RESPONSE_BODY - RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d) - echo -e "Error: Cannot create empty scenario $SCENARIO_NAME.\nHTTP status: $HTTP_STATUS, response body: $RESPONSE_BODY" - exit 12 - fi - - echo "Empty scenario $SCENARIO_NAME created successfully." -} - -function importScenarioFromFile() { - if [ "$#" -ne 2 ]; then - echo "Error: Two parameters required: 1) scenario name, 2) scenario file path" - exit 21 - fi - - set -e - - local SCENARIO_NAME=$1 - local SCENARIO_FILE=$2 - - local RESPONSE - RESPONSE=$(curl -s -L -w "\n%{http_code}" -u admin:admin \ - -X POST "http://nginx:8080/api/processes/import/$SCENARIO_NAME" \ - -F "process=@$SCENARIO_FILE" - ) - - # Check response body and status code - local HTTP_STATUS - HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1) - - local RESPONSE_BODY - RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d) - - if [ "$HTTP_STATUS" == "200" ]; then - local SCENARIO_GRAPH - SCENARIO_GRAPH=$(echo "$RESPONSE_BODY" | jq '.scenarioGraph') - echo "$SCENARIO_GRAPH" - else - echo -e "Error: Cannot import scenario $SCENARIO_NAME.\nHTTP status: $HTTP_STATUS, response body: $RESPONSE_BODY" - exit 22 - fi -} - -function saveScenario() { - if [ "$#" -ne 2 ]; then - echo "Error: Two parameters required: 1) scenario name, 2) scenario graph JSON representation" - exit 31 - fi - - set -e - - local SCENARIO_NAME=$1 - local SCENARIO_GRAPH_JSON=$2 - - local REQUEST_BODY="{ - \"scenarioGraph\": $SCENARIO_GRAPH_JSON, - \"comment\": \"\" - }" - - local RESPONSE - RESPONSE=$(curl -s -L -w "\n%{http_code}" -u admin:admin \ - -X PUT "http://nginx:8080/api/processes/$SCENARIO_NAME" \ - -H "Content-Type: application/json" -d "$REQUEST_BODY" - ) - - local HTTP_STATUS - HTTP_STATUS=$(echo "$RESPONSE" | tail -n 1) - - if [ "$HTTP_STATUS" != "200" ]; then - local RESPONSE_BODY - RESPONSE_BODY=$(echo "$RESPONSE" | sed \$d) - echo -e "Error: Cannot save scenario $SCENARIO_NAME.\nHTTP status: $HTTP_STATUS, response body: $RESPONSE_BODY" - exit 32 - fi - - echo "Scenario $SCENARIO_NAME saved successfully." -} - -META_DATA_TYPE=$(jq -r .metaData.additionalFields.metaDataType < "$SCENARIO_FILE_PATH") -case "$META_DATA_TYPE" in - "StreamMetaData") - ENGINE="Flink" - PROCESSING_MODE="Unbounded-Stream" - ;; - "LiteStreamMetaData") - ENGINE="Lite K8s" - PROCESSING_MODE="Unbounded-Stream" - ;; - "RequestResponseMetaData") - ENGINE="Lite K8s" - PROCESSING_MODE="Request-Response" - ;; - *) - echo "Error: Cannot import scenario with metadata type: $META_DATA_TYPE" - exit 2 - ;; -esac - -createEmptyScenario "$SCENARIO_NAME" "$PROCESSING_MODE" "$CATEGORY" "$ENGINE" -SCENARIO_GRAPH=$(importScenarioFromFile "$SCENARIO_NAME" "$SCENARIO_FILE_PATH") -saveScenario "$SCENARIO_NAME" "$SCENARIO_GRAPH" diff --git a/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json.sh b/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json.sh deleted file mode 100755 index 8e80c5e75e9..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/scripts/utils/nu/load-scenario-from-json.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash -e - -cd "$(dirname "$0")" - -if [ "$#" -lt 2 ]; then - echo "Error: Two parameters required: 1) scenario name, 2) scenario JSON" - exit 1 -fi - -SCENARIO_NAME=$1 -SCENARIO_JSON=$2 -SCENARIO_JSON_FILE="/tmp/scenario-$SCENARIO_NAME.json" - -echo "$SCENARIO_JSON" > "$SCENARIO_JSON_FILE" -trap 'rm "$SCENARIO_JSON_FILE"' EXIT - -./load-scenario-from-json-file.sh "$SCENARIO_NAME" "$SCENARIO_JSON_FILE" diff --git a/e2e-tests/src/test/resources/spec-setup/spec-setup.override.yml b/e2e-tests/src/test/resources/spec-setup/spec-setup.override.yml deleted file mode 100644 index 0be16dc9ec8..00000000000 --- a/e2e-tests/src/test/resources/spec-setup/spec-setup.override.yml +++ /dev/null @@ -1,9 +0,0 @@ -services: - - spec-setup: - build: - context: ../../e2e-tests/src/test/resources/spec-setup/ - dockerfile: Dockerfile - depends_on: - nginx: - condition: service_healthy diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/BaseE2eSpec.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/BaseE2eSpec.scala new file mode 100644 index 00000000000..d4e1490639c --- /dev/null +++ b/e2e-tests/src/test/scala/pl/touk/nussknacker/BaseE2eSpec.scala @@ -0,0 +1,36 @@ +package pl.touk.nussknacker + +import better.files._ +import com.typesafe.scalalogging.LazyLogging +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite} +import pl.touk.nussknacker.engine.version.BuildInfo +import pl.touk.nussknacker.test.installationexample.{ + DockerBasedInstallationExampleClient, + DockerBasedInstallationExampleNuEnvironment +} + +import java.io.{File => JFile} + +// Before running tests in this module, a fresh docker image should be built from sources and placed in the local +// registry. If you run tests based on this trait in Intellij Idea and the images is not built, you can do it manually: +// `bash -c "export NUSSKNACKER_SCALA_VERSION=2.12 && sbt dist/Docker/publishLocal"` +trait BaseE2eSpec extends BeforeAndAfterAll with BeforeAndAfterEach with LazyLogging { + this: Suite => + + val client: DockerBasedInstallationExampleClient = + BaseE2eSpec.dockerBasedInstallationExampleNuEnvironmentSingleton.client +} + +object BaseE2eSpec extends LazyLogging { + + val dockerBasedInstallationExampleNuEnvironmentSingleton = + new DockerBasedInstallationExampleNuEnvironment( + nussknackerImageVersion = BuildInfo.version, + dockerComposeTweakFiles = List( + new JFile(Resource.getUrl("bootstrap-setup-scenarios.override.yml").toURI), + new JFile(Resource.getUrl("batch-nu-designer.override.yml").toURI), + new JFile(Resource.getUrl("debuggable-nu-designer.override.yml").toURI) + ) + ) + +} diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala index e7b3fc5b299..3017ec0afcc 100644 --- a/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala +++ b/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala @@ -13,7 +13,7 @@ import pl.touk.nussknacker.ui.process.marshall.CanonicalProcessConverter.toScena class BatchDataGenerationSpec extends AnyFreeSpecLike - with DockerBasedInstallationExampleNuEnvironment + with BaseE2eSpec with Matchers with VeryPatientScalaFutures with NuRestAssureExtensions diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/DetectLargeTransactionSpec.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/DetectLargeTransactionSpec.scala index d987ba913b0..e0758e4ddcc 100644 --- a/e2e-tests/src/test/scala/pl/touk/nussknacker/DetectLargeTransactionSpec.scala +++ b/e2e-tests/src/test/scala/pl/touk/nussknacker/DetectLargeTransactionSpec.scala @@ -4,11 +4,7 @@ import org.scalatest.freespec.AnyFreeSpecLike import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.test.VeryPatientScalaFutures -class DetectLargeTransactionSpec - extends AnyFreeSpecLike - with DockerBasedInstallationExampleNuEnvironment - with Matchers - with VeryPatientScalaFutures { +class DetectLargeTransactionSpec extends AnyFreeSpecLike with BaseE2eSpec with Matchers with VeryPatientScalaFutures { "Large transactions should be properly detected" in { val smallAmountTransactions = List( @@ -23,18 +19,18 @@ class DetectLargeTransactionSpec ) (smallAmountTransactions ::: largeAmountTransactions).foreach { transaction => - sendMessageToKafka("Transactions", transaction) + client.sendMessageToKafka("Transactions", transaction) } eventually { - val processedTransactions = readAllMessagesFromKafka("ProcessedTransactions") + val processedTransactions = client.readAllMessagesFromKafka("ProcessedTransactions") processedTransactions should equal(largeAmountTransactions) } } override protected def afterEach(): Unit = { - purgeKafkaTopic("Transactions") - purgeKafkaTopic("ProcessedTransactions") + client.purgeKafkaTopic("Transactions") + client.purgeKafkaTopic("ProcessedTransactions") super.afterEach() } diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/DetermineOfferedPlanSpec.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/DetermineOfferedPlanSpec.scala index 49d57696cc8..2d5f0ce92ab 100644 --- a/e2e-tests/src/test/scala/pl/touk/nussknacker/DetermineOfferedPlanSpec.scala +++ b/e2e-tests/src/test/scala/pl/touk/nussknacker/DetermineOfferedPlanSpec.scala @@ -4,11 +4,7 @@ import org.scalatest.freespec.AnyFreeSpecLike import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.test.VeryPatientScalaFutures -class DetermineOfferedPlanSpec - extends AnyFreeSpecLike - with DockerBasedInstallationExampleNuEnvironment - with Matchers - with VeryPatientScalaFutures { +class DetermineOfferedPlanSpec extends AnyFreeSpecLike with BaseE2eSpec with Matchers with VeryPatientScalaFutures { "Properly determine offers for customers" in { val customers = List( @@ -19,11 +15,11 @@ class DetermineOfferedPlanSpec ) customers.foreach { customer => - sendMessageToKafka("Customers", customer) + client.sendMessageToKafka("Customers", customer) } eventually { - val smses = readAllMessagesFromKafka("SmsesWithOffer") + val smses = client.readAllMessagesFromKafka("SmsesWithOffer") smses should equal( List( smsWithOfferJson("Nick", "Junior Package"), @@ -34,8 +30,8 @@ class DetermineOfferedPlanSpec } override protected def afterEach(): Unit = { - purgeKafkaTopic("Customers") - purgeKafkaTopic("SmsesWithOffer") + client.purgeKafkaTopic("Customers") + client.purgeKafkaTopic("SmsesWithOffer") super.afterEach() } diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/DockerBasedInstallationExampleNuEnvironment.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/DockerBasedInstallationExampleNuEnvironment.scala deleted file mode 100644 index 17311c831f2..00000000000 --- a/e2e-tests/src/test/scala/pl/touk/nussknacker/DockerBasedInstallationExampleNuEnvironment.scala +++ /dev/null @@ -1,92 +0,0 @@ -package pl.touk.nussknacker - -import better.files._ -import com.dimafeng.testcontainers._ -import com.typesafe.scalalogging.LazyLogging -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite} -import org.testcontainers.containers.output.Slf4jLogConsumer -import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy -import pl.touk.nussknacker.test.containers.ContainerExt._ -import pl.touk.nussknacker.DockerBasedInstallationExampleNuEnvironment.{JSON, singletonContainer} -import ujson.Value -import pl.touk.nussknacker.engine.version.BuildInfo - -import java.io.{File => JFile} -import java.time.Duration - -// Before running tests in this module, a fresh docker image should be built from sources and placed in the local -// registry. If you run tests based on this trait in Intellij Idea and the images is not built, you can do it manually: -// `bash -c "export NUSSKNACKER_SCALA_VERSION=2.12 && sbt dist/Docker/publishLocal"` -trait DockerBasedInstallationExampleNuEnvironment extends BeforeAndAfterAll with BeforeAndAfterEach with LazyLogging { - this: Suite => - - private val specSetupService = unsafeContainerByServiceName("spec-setup") - - def loadFlinkStreamingScenarioFromResource(scenarioName: String, scenarioJsonFile: File): Unit = { - val escapedScenarioJson = scenarioJsonFile.contentAsString().replaceAll("\"", "\\\\\"") - specSetupService.executeBash( - s"""/app/scripts/utils/nu/load-scenario-from-json.sh "$scenarioName" "$escapedScenarioJson" """ - ) - } - - def deployAndWaitForRunningState(scenarioName: String): Unit = { - specSetupService.executeBash( - s"""/app/scripts/utils/nu/deploy-scenario-and-wait-for-running-state.sh "$scenarioName" """ - ) - } - - def sendMessageToKafka(topic: String, message: JSON): Unit = { - val escapedMessage = message.render().replaceAll("\"", "\\\\\"") - specSetupService.executeBash(s"""/app/scripts/utils/kafka/send-to-topic.sh "$topic" "$escapedMessage" """) - } - - def readAllMessagesFromKafka(topic: String): List[JSON] = { - specSetupService - .executeBashAndReadStdout(s"""/app/scripts/utils/kafka/read-from-topic.sh "$topic" """) - .split("\n") - .toList - .map(ujson.read(_)) - } - - def purgeKafkaTopic(topic: String): Unit = { - specSetupService.executeBash(s"""/app/scripts/utils/kafka/purge-topic.sh "$topic" """) - } - - private def unsafeContainerByServiceName(name: String) = singletonContainer - .getContainerByServiceName(name) - .getOrElse(throw new IllegalStateException(s"'$name' service not available!")) - -} - -object DockerBasedInstallationExampleNuEnvironment extends LazyLogging { - - type JSON = Value - - val singletonContainer: DockerComposeContainer = new DockerComposeContainer( - composeFiles = Seq( - new JFile("examples/installation/docker-compose.yml"), - new JFile(Resource.getUrl("spec-setup/spec-setup.override.yml").toURI), - new JFile(Resource.getUrl("spec-setup/batch-nu-designer.override.yml").toURI), - new JFile(Resource.getUrl("spec-setup/debuggable-nu-designer.override.yml").toURI) - ), - env = Map( - "NUSSKNACKER_VERSION" -> BuildInfo.version - ), - logConsumers = Seq( - ServiceLogConsumer("spec-setup", new Slf4jLogConsumer(logger.underlying)) - ), - waitingFor = Some( - WaitingForService( - "spec-setup", - new LogMessageWaitStrategy() - .withRegEx("^Setup done!.*") - .withStartupTimeout(Duration.ofSeconds(120L)) - ) - ), - // Change to 'true' to enable logging - tailChildContainers = false - ) - - singletonContainer.start() - -} diff --git a/examples/installation/.env b/examples/installation/.env deleted file mode 100644 index 35735ab8847..00000000000 --- a/examples/installation/.env +++ /dev/null @@ -1 +0,0 @@ -NUSSKNACKER_VERSION=1.15.3 diff --git a/nussknacker-dist/src/universal/conf/logback.xml b/nussknacker-dist/src/universal/conf/logback.xml index ce23f616eb6..be2d2f4f5c5 100644 --- a/nussknacker-dist/src/universal/conf/logback.xml +++ b/nussknacker-dist/src/universal/conf/logback.xml @@ -55,5 +55,6 @@ + diff --git a/utils/test-utils/src/main/resources/bootstrap-setup.override.yml b/utils/test-utils/src/main/resources/bootstrap-setup.override.yml new file mode 100644 index 00000000000..c789679792f --- /dev/null +++ b/utils/test-utils/src/main/resources/bootstrap-setup.override.yml @@ -0,0 +1,24 @@ +services: + + bootstrap-setup: + image: touk/nussknacker-example-scenarios-library:0.3.0 + depends_on: + nginx: + condition: service_healthy + designer: + condition: service_healthy + kafka: + condition: service_healthy + schema-registry: + condition: service_healthy + environment: + DISABLE_EMBEDDED_EXAMPLES: true + NU_DESIGNER_ADDRESS: "nginx:8080" + NU_REQUEST_RESPONSE_OPEN_API_SERVICE_ADDRESS: "nginx:8181" + KAFKA_ADDRESS: "kafka:9092" + SCHEMA_REGISTRY_ADDRESS: "schema-registry:8081" + deploy: + resources: + limits: + memory: 256M + cpus: '0.5' diff --git a/utils/test-utils/src/main/resources/logback-test.xml b/utils/test-utils/src/main/resources/logback-test.xml index b36cf2e7ef1..2407b450eb6 100644 --- a/utils/test-utils/src/main/resources/logback-test.xml +++ b/utils/test-utils/src/main/resources/logback-test.xml @@ -63,6 +63,6 @@ - + diff --git a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/MiscUtils.scala b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/MiscUtils.scala new file mode 100644 index 00000000000..bc9972034d0 --- /dev/null +++ b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/MiscUtils.scala @@ -0,0 +1,21 @@ +package pl.touk.nussknacker.test + +import org.apache.commons.io.IOUtils + +import java.io.{File, FileOutputStream, InputStream} + +object MiscUtils { + + implicit class InputStreamOps(val in: InputStream) extends AnyVal { + + def toFile: File = { + val tempFile = File.createTempFile("Nussknacker", null) + tempFile.deleteOnExit() + val out = new FileOutputStream(tempFile); + IOUtils.copy(in, out); + tempFile + } + + } + +} diff --git a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/containers/ContainerExt.scala b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/containers/ContainerExt.scala index 96324f4e95f..f6b90cb1ad0 100644 --- a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/containers/ContainerExt.scala +++ b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/containers/ContainerExt.scala @@ -8,11 +8,11 @@ import scala.language.implicitConversions class ContainerExt(val container: ContainerState) extends LazyLogging { def executeBash(cmd: String): Unit = { - logger.info(executeBashAndReadStdout(cmd)) + executeBashAndReadStdout(cmd) } def executeBashAndReadStdout(cmd: String): String = { - logger.info(s"Calling command '$cmd' on container '${container.getContainerInfo.getName}' ...") + logger.debug(s"Calling command '$cmd' on container '${container.getContainerInfo.getName}' ...") val exitResult = container.execInContainer("bash", "-c", cmd) exitResult.getExitCode match { case 0 => diff --git a/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/installationexample/DockerBasedNuInstallationExampleEnvironment.scala b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/installationexample/DockerBasedNuInstallationExampleEnvironment.scala new file mode 100644 index 00000000000..53a694e4841 --- /dev/null +++ b/utils/test-utils/src/main/scala/pl/touk/nussknacker/test/installationexample/DockerBasedNuInstallationExampleEnvironment.scala @@ -0,0 +1,90 @@ +package pl.touk.nussknacker.test.installationexample + +import com.dimafeng.testcontainers.{DockerComposeContainer, ServiceLogConsumer, WaitingForService} +import com.typesafe.scalalogging.LazyLogging +import org.slf4j.Logger +import org.testcontainers.containers.output.Slf4jLogConsumer +import org.testcontainers.containers.wait.strategy.DockerHealthcheckWaitStrategy +import pl.touk.nussknacker.test.containers.ContainerExt.toContainerExt +import pl.touk.nussknacker.test.installationexample.DockerBasedInstallationExampleNuEnvironment.{JSON, slf4jLogger} +import ujson.Value +import pl.touk.nussknacker.test.MiscUtils._ + +import java.io.{File => JFile} +import java.time.Duration + +class DockerBasedInstallationExampleNuEnvironment( + nussknackerImageVersion: String, + dockerComposeTweakFiles: Iterable[JFile] +) extends DockerComposeContainer( + composeFiles = new JFile("examples/installation/docker-compose.yml") :: + DockerBasedInstallationExampleNuEnvironment.getClass + .getResourceAsStream("/bootstrap-setup.override.yml") + .toFile :: + dockerComposeTweakFiles.toList, + env = Map( + "NUSSKNACKER_VERSION" -> nussknackerImageVersion + ), + logConsumers = Seq( + ServiceLogConsumer("bootstrap-setup", new Slf4jLogConsumer(slf4jLogger)) + ), + waitingFor = Some( + WaitingForService("bootstrap-setup", new DockerHealthcheckWaitStrategy().withStartupTimeout(Duration.ofSeconds(120))) + ), + // Change to 'true' to enable logging + tailChildContainers = false + ) { + + start() + + val client: DockerBasedInstallationExampleClient = new DockerBasedInstallationExampleClient(this) +} + +object DockerBasedInstallationExampleNuEnvironment extends LazyLogging { + + type JSON = Value + + private def slf4jLogger: Logger = logger.underlying + +} + +class DockerBasedInstallationExampleClient(env: DockerBasedInstallationExampleNuEnvironment) { + + private val bootstrapSetupService = unsafeContainerByServiceName("bootstrap-setup") + + def deployAndWaitForRunningState(scenarioName: String): Unit = { + bootstrapSetupService.executeBash( + s"""/app/utils/nu/deploy-scenario-and-wait-for-running-state.sh "$scenarioName" """ + ) + } + + def sendMessageToKafka(topic: String, message: JSON): Unit = { + sendMessagesToKafka(topic, message :: Nil) + } + + def sendMessagesToKafka(topic: String, messages: Iterable[JSON]): Unit = { + val escapedMessages = messages.map(_.render().replaceAll("\"", "\\\\\"")).mkString("\n") + bootstrapSetupService.executeBash(s"""/app/utils/kafka/send-to-topic.sh "$topic" "$escapedMessages" """) + } + + def readAllMessagesFromKafka(topic: String): List[JSON] = { + bootstrapSetupService + .executeBashAndReadStdout(s"""/app/utils/kafka/read-from-topic.sh "$topic" """) + .split("\n") + .flatMap { + case "" => None + case str => Option(str) + } + .toList + .map(ujson.read(_)) + } + + def purgeKafkaTopic(topic: String): Unit = { + bootstrapSetupService.executeBash(s"""/app/utils/kafka/purge-topic.sh "$topic" """) + } + + private def unsafeContainerByServiceName(name: String) = env + .getContainerByServiceName(name) + .getOrElse(throw new IllegalStateException(s"'$name' service not available!")) + +}