Skip to content

Commit

Permalink
Merge pull request #6886 from TouK/catalog-nodedeploymentdata-fix
Browse files Browse the repository at this point in the history
FlinkTestScenarioRunner: ability to pass NodesDeploymentData + fix for Apache Iceberg Illegal table name error
  • Loading branch information
arkadius authored Sep 17, 2024
2 parents 662f0d6 + bad6c9d commit bc159b8
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 56 deletions.
2 changes: 2 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
some types of unallowed expressions.
* [#6880](https://github.com/TouK/nussknacker/pull/6880) Performance optimization of generating Avro messages with unions
- shorter message in logs
* [#6886](https://github.com/TouK/nussknacker/pull/6886) Fix for "Illegal table name:$nuCatalog" error when using Apache Iceberg catalog.
Internal Nussknacker catalog is now named `_nu_catalog`

## 1.17

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pl.touk.nussknacker.engine.flink.table.definition.MockableCatalogFactory
pl.touk.nussknacker.engine.flink.table.definition.StubbedCatalogFactory

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package pl.touk.nussknacker.engine.flink.table.definition

import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.catalog.{Catalog, CatalogTable, GenericInMemoryCatalog, ObjectPath}
import org.apache.flink.table.factories.CatalogFactory

import scala.jdk.CollectionConverters._

class StubbedCatalogFactory extends CatalogFactory {

override def factoryIdentifier(): String = StubbedCatalogFactory.catalogName

override def createCatalog(context: CatalogFactory.Context): Catalog = StubbedCatalogFactory.catalog

}

object StubbedCatalogFactory {

val catalogName = "stubbed"

val sampleBoundedTablePath: ObjectPath = ObjectPath.fromString("default.sample_bounded_table")

val sampleBoundedTableNumberOfRows: Int = 10

val sampleColumnName = "fooColumn"

private val catalog: GenericInMemoryCatalog = populateCatalog(new GenericInMemoryCatalog(catalogName))

private def populateCatalog(inMemoryCatalog: GenericInMemoryCatalog): GenericInMemoryCatalog = {
val sampleBoundedTable = CatalogTable.of(
Schema.newBuilder().column(sampleColumnName, DataTypes.STRING()).build(),
null,
List.empty[String].asJava,
Map(
"connector" -> "datagen",
// to make it bounded
"number-of-rows" -> sampleBoundedTableNumberOfRows.toString
).asJava
)
inMemoryCatalog.createTable(sampleBoundedTablePath, sampleBoundedTable, false)
inMemoryCatalog
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,19 @@ class TablesDefinitionDiscoveryTest
}

test("use catalog configuration in data definition") {
val catalogConfiguration = Configuration.fromMap(Map("type" -> "mockable").asJava)
val catalogTable = CatalogTable.of(
Schema.newBuilder().column("fooColumn", DataTypes.STRING()).build(),
null,
List.empty[String].asJava,
Map.empty[String, String].asJava
)
MockableCatalogFactory.resetCatalog()
MockableCatalogFactory.catalog.createTable(ObjectPath.fromString("default.fooTable"), catalogTable, false)
val flinkDataDefinition = FlinkDataDefinition.create(None, Some(catalogConfiguration)).validValue
val catalogConfiguration = Configuration.fromMap(Map("type" -> StubbedCatalogFactory.catalogName).asJava)
val flinkDataDefinition = FlinkDataDefinition.create(None, Some(catalogConfiguration)).validValue

val discovery = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).validValue

val tableDefinition = discovery.listTables.loneElement

tableDefinition.tableId.toString shouldBe s"`${FlinkDataDefinition.internalCatalogName}`.`default`.`fooTable`"
tableDefinition.schema shouldBe ResolvedSchema.of(Column.physical("fooColumn", DataTypes.STRING()))
tableDefinition.tableId.toString shouldBe s"`${FlinkDataDefinition.internalCatalogName}`." +
s"`${StubbedCatalogFactory.sampleBoundedTablePath.getDatabaseName}`." +
s"`${StubbedCatalogFactory.sampleBoundedTablePath.getObjectName}`"
tableDefinition.schema shouldBe ResolvedSchema.of(
Column.physical(StubbedCatalogFactory.sampleColumnName, DataTypes.STRING())
)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import org.apache.flink.types.Row
import org.scalatest.LoneElement
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.NodeId
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, NodesDeploymentData, SqlFilteringExpression}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.table.FlinkTableComponentProvider
import pl.touk.nussknacker.engine.flink.table.definition.{FlinkDataDefinition, StubbedCatalogFactory}
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner
import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode
Expand Down Expand Up @@ -65,7 +67,7 @@ class TableSourceTest
.withExtraComponents(tableComponents)
.build()

test("be possible to user table declared inside a database other than the default one") {
test("be possible to use table declared inside a database other than the default one") {
val scenario = ScenarioBuilder
.streaming("test")
.source("start", "table", "Table" -> s"'`default_catalog`.`testdb`.`tablewithqualifiedname`'".spel)
Expand All @@ -76,4 +78,69 @@ class TableSourceTest
result.successes.loneElement
}

test("be possible to use nodes deployment data") {
val scenario = ScenarioBuilder
.streaming("test")
.source("start", "table", "Table" -> s"'`default_catalog`.`testdb`.`tablewithqualifiedname`'".spel)
.emptySink(s"end", TestScenarioRunner.testResultSink, "value" -> "#input".spel)

val result = runner
.runWithoutData[Row](
scenario,
nodesData = NodesDeploymentData(Map(NodeId("start") -> SqlFilteringExpression("true = true")))
)
.validValue
result.errors shouldBe empty
result.successes.loneElement
}

test("be possible combine nodes deployment data with catalogs configuration") {
val configWithCatalogConfiguration = ConfigFactory.parseString(
s"""catalogConfiguration {
| type: ${StubbedCatalogFactory.catalogName}
|}""".stripMargin
)

val tableComponentsBasedOnCatalogConfiguration: List[ComponentDefinition] =
new FlinkTableComponentProvider().create(
configWithCatalogConfiguration,
ProcessObjectDependencies.withConfig(configWithCatalogConfiguration)
)

val runnerWithCatalogConfiguration: FlinkTestScenarioRunner = TestScenarioRunner
.flinkBased(ConfigFactory.empty(), flinkMiniCluster)
.withExecutionMode(ExecutionMode.Batch)
.withExtraComponents(tableComponentsBasedOnCatalogConfiguration)
.build()

val scenario = ScenarioBuilder
.streaming("test")
.source(
"start",
"table",
"Table" -> (s"'`${FlinkDataDefinition.internalCatalogName}`." +
s"`${StubbedCatalogFactory.sampleBoundedTablePath.getDatabaseName}`." +
s"`${StubbedCatalogFactory.sampleBoundedTablePath.getObjectName}`'").spel
)
.emptySink(s"end", TestScenarioRunner.testResultSink, "value" -> "#input".spel)

val resultWithoutFiltering = runnerWithCatalogConfiguration
.runWithoutData[Row](
scenario,
nodesData = NodesDeploymentData(Map(NodeId("start") -> SqlFilteringExpression("true = true")))
)
.validValue
resultWithoutFiltering.errors shouldBe empty
resultWithoutFiltering.successes should have size StubbedCatalogFactory.sampleBoundedTableNumberOfRows

val resultWithFiltering = runnerWithCatalogConfiguration
.runWithoutData[Row](
scenario,
nodesData = NodesDeploymentData(Map(NodeId("start") -> SqlFilteringExpression("true = false")))
)
.validValue
resultWithFiltering.errors shouldBe empty
resultWithFiltering.successes shouldBe empty
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ class FlinkDataDefinition private (

object FlinkDataDefinition {

private[definition] val internalCatalogName = "$nuCatalog"
// We can't user dollar ($) character in this name as some catalogs such as Apache Iceberg use it internally
// to split object paths
private[table] val internalCatalogName = "_nu_catalog"

def create(
sqlStatements: Option[List[String]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.typesafe.config.{Config, ConfigValueFactory}
import org.apache.flink.api.connector.source.Boundedness
import pl.touk.nussknacker.defaultmodel.DefaultConfigCreator
import pl.touk.nussknacker.engine.api.ProcessVersion
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, NodesDeploymentData}
import pl.touk.nussknacker.engine.api.process.{ComponentUseCase, SourceFactory}
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult, Unknown}
import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess
Expand Down Expand Up @@ -74,17 +74,23 @@ class FlinkTestScenarioRunner(
) extends ClassBasedTestScenarioRunner {

override def runWithData[I: ClassTag, R](scenario: CanonicalProcess, data: List[I]): RunnerListResult[R] = {
runWithTestSourceComponent(scenario, testDataSourceComponent(data, Typed.typedClass[I], None))
runWithTestSourceComponent(
scenario,
NodesDeploymentData.empty,
testDataSourceComponent(data, Typed.typedClass[I], None)
)
}

def runWithData[I: ClassTag, R](
scenario: CanonicalProcess,
data: List[I],
boundedness: Boundedness = Boundedness.CONTINUOUS_UNBOUNDED,
timestampAssigner: Option[TimestampWatermarkHandler[I]] = None
timestampAssigner: Option[TimestampWatermarkHandler[I]] = None,
nodesData: NodesDeploymentData = NodesDeploymentData.empty
): RunnerListResult[R] = {
runWithTestSourceComponent(
scenario,
nodesData,
testDataSourceComponent(data, Typed.typedClass[I], timestampAssigner, boundedness)
)
}
Expand All @@ -94,56 +100,67 @@ class FlinkTestScenarioRunner(
data: List[I],
inputType: TypingResult,
boundedness: Boundedness = Boundedness.CONTINUOUS_UNBOUNDED,
timestampAssigner: Option[TimestampWatermarkHandler[I]] = None
timestampAssigner: Option[TimestampWatermarkHandler[I]] = None,
nodesData: NodesDeploymentData = NodesDeploymentData.empty
): RunnerListResult[R] = {
runWithTestSourceComponent(
scenario,
nodesData,
testDataSourceComponent(data, inputType, timestampAssigner, boundedness)
)
}

private def runWithTestSourceComponent[I: ClassTag, R](
scenario: CanonicalProcess,
nodesData: NodesDeploymentData = NodesDeploymentData.empty,
testDataSourceComponent: ComponentDefinition
): RunnerListResult[R] = {
val testComponents = testDataSourceComponent :: noopSourceComponent :: Nil
Using.resource(
TestExtensionsHolder
.registerTestExtensions(components ++ testComponents, testResultSinkComponentCreator :: Nil, globalVariables)
) { testComponentHolder =>
run[R](scenario, testComponentHolder)
run[R](scenario, nodesData, testComponentHolder)
}
}

/**
* Can be used to test Flink bounded sources - we wait for the scenario to finish.
*/
def runWithoutData[R](scenario: CanonicalProcess): RunnerListResult[R] = {
def runWithoutData[R](
scenario: CanonicalProcess,
nodesData: NodesDeploymentData = NodesDeploymentData.empty
): RunnerListResult[R] = {
val testComponents = noopSourceComponent :: Nil
Using.resource(
TestExtensionsHolder
.registerTestExtensions(components ++ testComponents, testResultSinkComponentCreator :: Nil, globalVariables)
) { testComponentHolder =>
run[R](scenario, testComponentHolder)
run[R](scenario, nodesData, testComponentHolder)
}
}

/**
* Can be used to test Flink based sinks.
*/
def runWithDataIgnoringResults[I: ClassTag](scenario: CanonicalProcess, data: List[I]): RunnerResultUnit = {
def runWithDataIgnoringResults[I: ClassTag](
scenario: CanonicalProcess,
data: List[I],
nodesData: NodesDeploymentData = NodesDeploymentData.empty
): RunnerResultUnit = {
val testComponents = testDataSourceComponent(data, Typed.typedClass[I], None) :: noopSourceComponent :: Nil
Using.resource(
TestExtensionsHolder.registerTestExtensions(components ++ testComponents, List.empty, globalVariables)
) { testComponentHolder =>
run[AnyRef](scenario, testComponentHolder).map { case RunListResult(errors, _) =>
run[AnyRef](scenario, nodesData, testComponentHolder).map { case RunListResult(errors, _) =>
RunUnitResult(errors)
}
}
}

private def run[OUTPUT](
scenario: CanonicalProcess,
nodesData: NodesDeploymentData,
testExtensionsHolder: TestExtensionsHolder
): RunnerListResult[OUTPUT] = {
val modelData = LocalModelData(
Expand Down Expand Up @@ -186,7 +203,7 @@ class FlinkTestScenarioRunner(
env,
scenario,
ProcessVersion.empty,
DeploymentData.empty,
DeploymentData.empty.copy(nodesData = nodesData),
testScenarioCollectorHandler.resultCollector
)

Expand Down

0 comments on commit bc159b8

Please sign in to comment.